Simple Status gated queue

This simple flow show how you could use the status node to act as a gate to a simple function node acting as a queue for sending messages to an mqtt node.

When the status is not connected the function node stores up the messages to a maximum depth (set in the function node to 10 currently).

When the mqtt node status changes to connected all the stored messages are then sent to the mqtt node for delivery.

Obviously this technique could be extended for use by other node types.

Note: the mqtt node may only detect the broken connection after its keepalive fails (typically set to 60 seconds), in that period the function node will not be saving messages... so you need to ensure the mqtt node is set to manage this itself, by using QoS 2 delivery and setting clean-session to false. That way the mqtt client will itself then hang onto those messages itself until they have been delivered.

[{"id":"f9ec7a29.852b18","type":"inject","z":"82738787.0e0338","name":"","topic":"test","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":220,"y":600,"wires":[["8e6485b.fc3ce78"]]},{"id":"d25c0e58.e75d7","type":"mqtt out","z":"82738787.0e0338","name":"","topic":"","qos":"2","retain":"","broker":"5085b198.064c4","x":630,"y":600,"wires":[]},{"id":"ccb84c90.3f15e","type":"debug","z":"82738787.0e0338","name":"","active":false,"console":"false","complete":"false","x":650,"y":660,"wires":[]},{"id":"8e6485b.fc3ce78","type":"function","z":"82738787.0e0338","name":"Status gated queue","func":"\nvar MAX_DEPTH = 10;\n\n// if queue doesn't exist, create it\ncontext.queue = context.queue || [];\ncontext.pass = context.pass || false;\n\n// Use MQTT connected status to gate flow\nif (msg.hasOwnProperty(\"status\")) {\n    if (msg.status.text.indexOf(\".connected\") !== -1) { \n        setTimeout(function() { \n            while (context.queue.length > 0) {\n                var m = context.queue.shift();\n                node.send(m); \n            }\n            context.pass = true; \n            node.status({});\n        },5);\n    }\n    else { context.pass = false; }\n}\nelse {\n    if (context.pass) { return msg; }\n    else { \n        context.queue.push(msg); \n        if (context.queue.length > MAX_DEPTH) { context.queue.shift(); }\n        node.status({text:context.queue.length});\n    }\n}\nreturn null;","outputs":1,"noerr":0,"x":430,"y":600,"wires":[["ccb84c90.3f15e","d25c0e58.e75d7"]]},{"id":"147525f9.9ff5ba","type":"status","z":"82738787.0e0338","name":"","scope":["d25c0e58.e75d7"],"x":220,"y":660,"wires":[["8e6485b.fc3ce78"]]},{"id":"5085b198.064c4","type":"mqtt-broker","z":"","name":"","broker":"myremotebroker","port":"1883","clientid":"test","usetls":false,"compatmode":true,"keepalive":"20","cleansession":false,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]

Flow Info

Created 6 years, 2 months ago
Rating: not yet rated

Owner

Actions

Rate:

Node Types

Core
  • debug (x1)
  • function (x1)
  • inject (x1)
  • mqtt out (x1)
  • mqtt-broker (x1)
  • status (x1)

Tags

  • status
  • gate
  • queue
Copy this flow JSON to your clipboard and then import into Node-RED using the Import From > Clipboard (Ctrl-I) menu option