Simple Status gated queue
This simple flow show how you could use the status node to act as a gate to a simple function node acting as a queue for sending messages to an mqtt
node.
When the status
is not connected the function
node stores up the messages to a maximum depth (set in the function node to 10 currently).
When the mqtt
node status changes to connected all the stored messages are then sent to the mqtt node for delivery.
Obviously this technique could be extended for use by other node types.
Note: the mqtt node may only detect the broken connection after its keepalive fails (typically set to 60 seconds), in that period the function node will not be saving messages... so you need to ensure the mqtt node is set to manage this itself, by using QoS 2
delivery and setting clean-session
to false. That way the mqtt client will itself then hang onto those messages itself until they have been delivered.
[{"id":"f9ec7a29.852b18","type":"inject","z":"82738787.0e0338","name":"","topic":"test","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":220,"y":600,"wires":[["8e6485b.fc3ce78"]]},{"id":"d25c0e58.e75d7","type":"mqtt out","z":"82738787.0e0338","name":"","topic":"","qos":"2","retain":"","broker":"5085b198.064c4","x":630,"y":600,"wires":[]},{"id":"ccb84c90.3f15e","type":"debug","z":"82738787.0e0338","name":"","active":false,"console":"false","complete":"false","x":650,"y":660,"wires":[]},{"id":"8e6485b.fc3ce78","type":"function","z":"82738787.0e0338","name":"Status gated queue","func":"\nvar MAX_DEPTH = 10;\n\n// if queue doesn't exist, create it\ncontext.queue = context.queue || [];\ncontext.pass = context.pass || false;\n\n// Use MQTT connected status to gate flow\nif (msg.hasOwnProperty(\"status\")) {\n if (msg.status.text.indexOf(\".connected\") !== -1) { \n setTimeout(function() { \n while (context.queue.length > 0) {\n var m = context.queue.shift();\n node.send(m); \n }\n context.pass = true; \n node.status({});\n },5);\n }\n else { context.pass = false; }\n}\nelse {\n if (context.pass) { return msg; }\n else { \n context.queue.push(msg); \n if (context.queue.length > MAX_DEPTH) { context.queue.shift(); }\n node.status({text:context.queue.length});\n }\n}\nreturn null;","outputs":1,"noerr":0,"x":430,"y":600,"wires":[["ccb84c90.3f15e","d25c0e58.e75d7"]]},{"id":"147525f9.9ff5ba","type":"status","z":"82738787.0e0338","name":"","scope":["d25c0e58.e75d7"],"x":220,"y":660,"wires":[["8e6485b.fc3ce78"]]},{"id":"5085b198.064c4","type":"mqtt-broker","z":"","name":"","broker":"myremotebroker","port":"1883","clientid":"test","usetls":false,"compatmode":true,"keepalive":"20","cleansession":false,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]