Guaranteed delivery of data (upload, email etc) across a network

This flow demonstrates how to achieve guaranteed delivery of data across a network, where it may not always be possible to send the data, due to network outage or server not available. When it is not possible the data will be buffered in a queue and transmitted later. If persistent context is configured then the queue can be retained there so that the queue will survive a node-red restart or power down.

This example shows how to use it for sending email, but it can be easily modified to handle other similar processes. The 'guaranteed delivery' in this case is only that it got to the email server of course, not that it actually got to the recipient. It can be passed messages suitable for sending to the email node by sending to the link labelled Send email messages to this link.

To try it, edit the Send test email node and enter an appropriate email recipient in msg.to. Edit the email node and enter your email server and credentials. If the email delivery (to the server) is successful then it will wait for more messages. If the delivery fails then it will retry approximately every 60 seconds until it succeeds. In the meantime any additional requests will be queued and will be sent when possible.

Double click the Guaranteed delivery node to see the configuration options available.

The subflow has to be informed when the delivery is successful or fails. This is done by passing in a message with a property set to a value to indicate success or failure. The property name and the success and fail values are configurable. The default property is control and the default values are OK and FAIL Choose a property name which will not conflict with any properties in messages to be sent. Look in the OK and FAIL nodes in the flow and it can be seen how the propery is setup ready for passing back to the subflow.

Following a failure the attempt is retried at a configurable Retry Period, configurable in the subflow properties. The default is 60 seconds.

If a persistent config store is available then this may also be selected in the properties. If this is done then if node-red is restarted or the machine reboots then messages currently in the queue will not be lost.

The maximum number of messages may be specified in the subflow properties. Once the queue is full then the oldest messages are discarded as new ones are passed in. If the max is set to zero then there will be no limit.

To modify the flow to deliver to something other than email then remove the nodes below the Guaranteed Delivery node and replace them with nodes to perform the required delivery action. When the delivery succeeds send an OK control message back to the node and if it fails then send a FAIL message back in the same way as is done in the sample flow. It is imperative that either an OK or a FAIL message is sent, and not both.

[{"id":"149380c1.63e107","type":"subflow","name":"Delivery subflow","info":"","category":"","in":[{"x":60,"y":80,"wires":[{"id":"6a3f78ab.f6b8e"}]}],"out":[{"x":420,"y":80,"wires":[{"id":"6a3f78ab.f6b8e","port":0}]}],"env":[{"name":"controlProperty","type":"str","value":"control","ui":{"label":{"en-US":"Property for OK or FAIL"},"type":"input","opts":{"types":["str","env"]}}},{"name":"OKValue","type":"str","value":"OK","ui":{"label":{"en-US":"Value of success"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"FAILValue","type":"str","value":"FAIL","ui":{"label":{"en-US":"Value for failure"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"retrySecs","type":"num","value":"60","ui":{"label":{"en-US":"Retry period (secs)"},"type":"input","opts":{"types":["num","env"]}}},{"name":"maxQueue","type":"num","value":"100","ui":{"label":{"en-US":"Max messages in queue"},"type":"input","opts":{"types":["str","num","env"]}}},{"name":"contextStore","type":"str","value":"default","ui":{"label":{"en-US":"Context Store to use"},"type":"input","opts":{"types":["str","env"]}}}],"color":"#DDAA99","status":{"x":420,"y":160,"wires":[{"id":"ed779289.25b5d8","port":0}]}},{"id":"6a3f78ab.f6b8e","type":"function","z":"149380c1.63e107","name":"State machine","func":"let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n    handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n    handleFAIL(stat)\n} else if (control === \"__trigger\") {\n    handleTrigger(stat)\n} else {\n    // no valid control value so must be incoming message\n    handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n    case \"initial\":\n    case \"waitingForMsg\":\n        sendMsg(stat)   // send next message if any\n        break;\n        \n    case \"waitingForTrigger\":\n    case \"waitingForOKFail\":\n        // do nothing\n        break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n    //node.warn(\"handleMessage\")\n    // push a clone onto the queue\n    stat.queue.push(RED.util.cloneMessage(msg))\n    // limit number in queue\n    const max = Number(env.get(\"maxQueue\"))\n    if (!isNaN(max) && max > 0) {\n        // max length hit, remove oldest\n        if (stat.queue.length > max) stat.queue.shift()\n    }\n    // Simulate a trigger event to handle any state change needed\n    handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n    //node.warn(\"sendMsg\")\n    let thisMsg = stat.queue[0]\n    if (thisMsg) {\n        // send a clone\n        //node.warn(\"sending\")\n        node.send(RED.util.cloneMessage(thisMsg))\n        stat.state = \"waitingForOKFail\"\n    } else {\n        // nothing in queue\n        stat.state = \"waitingForMsg\"\n    }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n    //node.warn(\"handleOK\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // OK received so drop the top message \n        stat.queue.shift()\n        // set the state to waiting for message, which will allow the next one to be sent\n        stat.state = \"waitingForMsg\"\n    } else {\n        node.warn(\"Ignoring unnexpected OK\")\n    }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n    //node.warn(\"handleFAIL\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // FAIL received so go to waiting for trigger state\n        stat.state = \"waitingForTrigger\"\n    } else {\n        node.warn(\"Ignoring unnexpected FAIL\")\n    }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n    //node.warn(\"handleTrigger\")\n    if (stat.state === \"waitingForTrigger\") {\n        //node.warn(\"state to waiting\")\n        // set it to watitingForMsg in order to trigger send \n        stat.state = \"waitingForMsg\"\n    }\n    // ignore for other states\n}","outputs":1,"noerr":0,"initialize":"","finalize":"","x":280,"y":80,"wires":[[]]},{"id":"602725f6.15eee4","type":"inject","z":"149380c1.63e107","name":"Retry ","props":[{"p":"${controlProperty}","v":"__trigger","vt":"str"}],"repeat":"${retrySecs}","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":110,"y":180,"wires":[["6a3f78ab.f6b8e"]]},{"id":"ed779289.25b5d8","type":"status","z":"149380c1.63e107","name":"","scope":["6a3f78ab.f6b8e"],"x":300,"y":160,"wires":[[]]},{"id":"40032b86.16b0dc","type":"link out","z":"6d810db7.640bf4","name":"","links":["eada5d9a.8c60e"],"x":675,"y":360,"wires":[]},{"id":"9d7e52ff.9373a","type":"change","z":"6d810db7.640bf4","name":"OK","rules":[{"t":"set","p":"control","pt":"msg","to":"OK","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":510,"y":360,"wires":[["40032b86.16b0dc"]]},{"id":"98d72bd3.c5c98","type":"change","z":"6d810db7.640bf4","name":"FAIL","rules":[{"t":"set","p":"control","pt":"msg","to":"FAIL","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":510,"y":400,"wires":[["40032b86.16b0dc"]]},{"id":"78bcff9d.8c29c","type":"inject","z":"6d810db7.640bf4","name":"Send test email","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"to","v":"[email protected]","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"email subject","payload":"This is the email contents","payloadType":"str","x":300,"y":80,"wires":[["52c5b999.776288"]]},{"id":"eada5d9a.8c60e","type":"link in","z":"6d810db7.640bf4","name":"","links":["40032b86.16b0dc"],"x":135,"y":320,"wires":[["406fad44.dfcbdc"]]},{"id":"be8e3686.b3db4","type":"e-mail","z":"6d810db7.640bf4","server":"your.email.server.com","port":"465","secure":true,"tls":false,"name":"","dname":"","x":310,"y":300,"wires":[]},{"id":"87580f77.38e0a","type":"complete","z":"6d810db7.640bf4","name":"","scope":["be8e3686.b3db4"],"uncaught":false,"x":330,"y":360,"wires":[["9d7e52ff.9373a"]]},{"id":"75675144.54ee98","type":"catch","z":"6d810db7.640bf4","name":"","scope":["be8e3686.b3db4"],"uncaught":false,"x":310,"y":400,"wires":[["98d72bd3.c5c98"]]},{"id":"26ef1404.562c74","type":"link in","z":"6d810db7.640bf4","name":"Email delivery","links":["52c5b999.776288"],"x":55,"y":220,"wires":[["406fad44.dfcbdc"]]},{"id":"52c5b999.776288","type":"link out","z":"6d810db7.640bf4","name":"","links":["26ef1404.562c74"],"x":415,"y":80,"wires":[]},{"id":"a246571e.945ae8","type":"comment","z":"6d810db7.640bf4","name":"Send email messages to this link","info":"","x":150,"y":180,"wires":[]},{"id":"406fad44.dfcbdc","type":"subflow:149380c1.63e107","z":"6d810db7.640bf4","name":"Guaranteed delivery","env":[],"x":320,"y":220,"wires":[["be8e3686.b3db4"]]}]

Flow Info

Created 1 year, 1 month ago
Updated 2 months, 2 weeks ago
Rating: not yet rated

Owner

Node Types

Core
  • catch (x1)
  • change (x2)
  • comment (x1)
  • complete (x1)
  • function (x1)
  • inject (x2)
  • link in (x2)
  • link out (x2)
  • status (x1)
  • subflow (x1)
Other
  • e-mail (x1)
  • subflow:149380c1.63e107 (x1)

Tags

  • email
Copy this flow JSON to your clipboard and then import into Node-RED using the Import From > Clipboard (Ctrl-I) menu option