Store and Forward with MQTT and SQLite
This Node-RED flow stores all data in an SQLite database for 30 days. In the storage process, it captures the status of the Mqtt node. If it is anything other than connected, and when the device switches to connected, it will retrieve all of the records from the database that had a status of not connected and send them via mqtt. Then, it updates the status of the SQLite database so that it won't send them again.
If you have suggestions for improvement, please reach out.
[{"id":"f8abdfa5.0da4f","type":"tab","label":"Store and Forward","disabled":false,"info":""},{"id":"c4826facfa237e4a","type":"group","z":"f8abdfa5.0da4f","name":"Stores Data with MQTT Status","style":{"label":true},"nodes":["a262c79b.e38ef8","ed0fe50d.a88828","c2bba32d.bec85","3f54a85d529ceda0","a6f49b7d3b13d893"],"x":54,"y":39,"w":992,"h":122},{"id":"569c965744f3f1d2","type":"group","z":"f8abdfa5.0da4f","name":"Check Status of MQTT Set Status. If change to connected kick off sending updated messages","style":{"label":true},"nodes":["eaad946b42ab4559","62e516c469624bfe","f15320139bd9c356","1b8fae1cb765465b","d82a17b6b934cbc4"],"x":62,"y":181,"w":652,"h":142},{"id":"400afb2f8fd43e75","type":"group","z":"f8abdfa5.0da4f","name":"Check Database for messages not sent and send","style":{"label":true},"nodes":["05af3e1d555fcffa","16df61560a4558c3","eb0735439bcb9a8f","c264f041824c47f5","8756f4c9f441d703","e2459324818bd95b","8131c503fa489881"],"x":726,"y":177,"w":892,"h":202},{"id":"6a92c35bed3e664e","type":"group","z":"f8abdfa5.0da4f","name":"Update databse to messages Sent","style":{"label":true},"nodes":["f24ee3daf21307b5","60149157bd95319b","7c726d4ab7853449"],"x":942,"y":401,"w":672,"h":82},{"id":"3ac298cf0d9a130d","type":"group","z":"f8abdfa5.0da4f","name":"MQTT ","style":{"label":true},"nodes":["c4d4891d.2716f","f67c5ac935d3798d","38bd406c5515edbd"],"x":1114,"y":37,"w":594,"h":84},{"id":"17372a6ef6cb8c81","type":"group","z":"f8abdfa5.0da4f","name":"Create and Delete Table","style":{"label":true},"nodes":["955d6c9676632b1b","3df68015d2747dea","e74ca593efeaa144","32e6ff7698366843","2a7cecdbdea8517c","5ef68a466c80b23b"],"x":54,"y":439,"w":732,"h":162},{"id":"6fc28662b2c48003","type":"group","z":"f8abdfa5.0da4f","name":"Delete messages older than 30 days","style":{"label":true},"nodes":["1b8f9f07.1af6d1","7c78d13a.233a4","3837f61d.ff5022"],"x":54,"y":639,"w":702,"h":82},{"id":"523fec971c39868e","type":"group","z":"f8abdfa5.0da4f","name":"Get Results from Table","style":{"label":true},"nodes":["452a8c9f8136727f","47244313ddf2d925","de1c6c284ce6eab9"],"x":54,"y":739,"w":572,"h":82},{"id":"a262c79b.e38ef8","type":"inject","z":"f8abdfa5.0da4f","g":"c4826facfa237e4a","name":"Simulated Incoming Message","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"timestamp","v":"","vt":"date"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"example/topic","payload":"{\"example\": \"data\"}","payloadType":"json","x":220,"y":120,"wires":[["ed0fe50d.a88828"]]},{"id":"ed0fe50d.a88828","type":"function","z":"f8abdfa5.0da4f","g":"c4826facfa237e4a","name":"Prepare for SQLite","func":"var time = new Date().getTime(); // ISO format date-time string\n\n// msg.topic = \"INSERT INTO messages (topic, payload, mqttStatus) VALUES (?, ?, ?)\";\nmsg.topic = \"INSERT INTO messages VALUES (\" + time + \", 'Hello World', '\" + JSON.stringify(msg.payload) + \"', '\" + global.get('mqttStatus') + \"')\";\nmsg.params = [msg.topic, JSON.stringify(msg.payload), global.get('mqttStatus')];\nreturn msg;","outputs":1,"timeout":"","noerr":0,"initialize":"","finalize":"","libs":[],"x":500,"y":120,"wires":[["c2bba32d.bec85"]]},{"id":"c2bba32d.bec85","type":"sqlite","z":"f8abdfa5.0da4f","g":"c4826facfa237e4a","mydb":"66829cd1.e354a4","sqlquery":"msg.topic","sql":"CREATE TABLE IF NOT EXISTS messages (\n id INTEGER PRIMARY KEY AUTOINCREMENT,\n topic TEXT,\n payload TEXT,\n timestamp DATETIME DEFAULT CURRENT_TIMESTAMP\n);","name":"Store Message","x":700,"y":120,"wires":[["3f54a85d529ceda0","a6f49b7d3b13d893"]]},{"id":"c4d4891d.2716f","type":"mqtt out","z":"f8abdfa5.0da4f","g":"3ac298cf0d9a130d","name":"Send to MQTT","topic":"juba/test","qos":"0","retain":"true","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"9b99f9e0.924008","x":1220,"y":80,"wires":[]},{"id":"1b8f9f07.1af6d1","type":"inject","z":"f8abdfa5.0da4f","g":"6fc28662b2c48003","name":"Daily Cleanup","props":[{"p":"payload"}],"repeat":"","crontab":"00 00 * * *","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":180,"y":680,"wires":[["7c78d13a.233a4"]]},{"id":"7c78d13a.233a4","type":"function","z":"f8abdfa5.0da4f","g":"6fc28662b2c48003","name":"Cleanup Old Messages","func":"msg.topic = \"DELETE FROM messages WHERE timestamp <= datetime('now', '-30 days')\";\nreturn msg;","outputs":1,"timeout":"","noerr":0,"initialize":"","finalize":"","libs":[],"x":390,"y":680,"wires":[["3837f61d.ff5022"]]},{"id":"3837f61d.ff5022","type":"sqlite","z":"f8abdfa5.0da4f","g":"6fc28662b2c48003","mydb":"66829cd1.e354a4","sqlquery":"msg.topic","sql":"","name":"Execute Cleanup","x":640,"y":680,"wires":[[]]},{"id":"955d6c9676632b1b","type":"inject","z":"f8abdfa5.0da4f","g":"17372a6ef6cb8c81","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":160,"y":480,"wires":[["3df68015d2747dea"]]},{"id":"3df68015d2747dea","type":"template","z":"f8abdfa5.0da4f","g":"17372a6ef6cb8c81","name":"Create Table","field":"topic","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"CREATE TABLE IF NOT EXISTS messages (\n timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,\n topic TEXT,\n payload TEXT,\n mqttStatus TEXT\n);","output":"str","x":350,"y":480,"wires":[["e74ca593efeaa144"]]},{"id":"e74ca593efeaa144","type":"sqlite","z":"f8abdfa5.0da4f","g":"17372a6ef6cb8c81","mydb":"66829cd1.e354a4","sqlquery":"msg.topic","sql":"","name":"","x":510,"y":520,"wires":[["32e6ff7698366843"]]},{"id":"32e6ff7698366843","type":"debug","z":"f8abdfa5.0da4f","g":"17372a6ef6cb8c81","name":"debug 104","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":670,"y":520,"wires":[]},{"id":"2a7cecdbdea8517c","type":"inject","z":"f8abdfa5.0da4f","g":"17372a6ef6cb8c81","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":160,"y":560,"wires":[["5ef68a466c80b23b"]]},{"id":"5ef68a466c80b23b","type":"template","z":"f8abdfa5.0da4f","g":"17372a6ef6cb8c81","name":"Delete Table","field":"topic","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"DROP TABLE IF EXISTS messages;","output":"str","x":350,"y":560,"wires":[["e74ca593efeaa144"]]},{"id":"3f54a85d529ceda0","type":"debug","z":"f8abdfa5.0da4f","g":"c4826facfa237e4a","name":"debug 105","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":930,"y":80,"wires":[]},{"id":"452a8c9f8136727f","type":"inject","z":"f8abdfa5.0da4f","g":"523fec971c39868e","name":"Get Results","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":170,"y":780,"wires":[["47244313ddf2d925"]]},{"id":"47244313ddf2d925","type":"sqlite","z":"f8abdfa5.0da4f","g":"523fec971c39868e","mydb":"66829cd1.e354a4","sqlquery":"fixed","sql":"SELECT * FROM messages;","name":"","x":330,"y":780,"wires":[["de1c6c284ce6eab9"]]},{"id":"de1c6c284ce6eab9","type":"debug","z":"f8abdfa5.0da4f","g":"523fec971c39868e","name":"debug 106","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":510,"y":780,"wires":[]},{"id":"f67c5ac935d3798d","type":"debug","z":"f8abdfa5.0da4f","g":"3ac298cf0d9a130d","name":"debug 108","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":1592,"y":78,"wires":[]},{"id":"38bd406c5515edbd","type":"mqtt in","z":"f8abdfa5.0da4f","g":"3ac298cf0d9a130d","name":"","topic":"juba/test","qos":"2","datatype":"auto-detect","broker":"9b99f9e0.924008","nl":false,"rap":true,"rh":0,"inputs":0,"x":1422,"y":78,"wires":[["f67c5ac935d3798d"]]},{"id":"a6f49b7d3b13d893","type":"change","z":"f8abdfa5.0da4f","g":"c4826facfa237e4a","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"params[1]","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":930,"y":120,"wires":[["c4d4891d.2716f"]]},{"id":"eaad946b42ab4559","type":"status","z":"f8abdfa5.0da4f","g":"569c965744f3f1d2","name":"MQTT Status","scope":["c4d4891d.2716f"],"x":158,"y":242,"wires":[["62e516c469624bfe","1b8fae1cb765465b"]]},{"id":"62e516c469624bfe","type":"function","z":"f8abdfa5.0da4f","g":"569c965744f3f1d2","name":"Store Status","func":"global.set('mqttStatus', msg.status.text);\nreturn msg;","outputs":1,"timeout":"","noerr":0,"initialize":"","finalize":"","libs":[],"x":368,"y":222,"wires":[[]]},{"id":"f15320139bd9c356","type":"rbe","z":"f8abdfa5.0da4f","g":"569c965744f3f1d2","name":"","func":"rbe","gap":"","start":"","inout":"out","septopics":true,"property":"payload","topi":"topic","x":518,"y":282,"wires":[["d82a17b6b934cbc4"]]},{"id":"1b8fae1cb765465b","type":"change","z":"f8abdfa5.0da4f","g":"569c965744f3f1d2","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"status.text","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":368,"y":282,"wires":[["f15320139bd9c356"]]},{"id":"05af3e1d555fcffa","type":"template","z":"f8abdfa5.0da4f","g":"400afb2f8fd43e75","name":"","field":"topic","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"SELECT timestamp, topic, payload, mqttStatus \nFROM messages \nWHERE mqttStatus != 'node-red:common.status.connected';","output":"str","x":840,"y":260,"wires":[["c264f041824c47f5"]]},{"id":"16df61560a4558c3","type":"debug","z":"f8abdfa5.0da4f","g":"400afb2f8fd43e75","name":"debug 110","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":1302,"y":218,"wires":[]},{"id":"eb0735439bcb9a8f","type":"inject","z":"f8abdfa5.0da4f","g":"400afb2f8fd43e75","name":"Test","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":822,"y":338,"wires":[["05af3e1d555fcffa"]]},{"id":"c264f041824c47f5","type":"sqlite","z":"f8abdfa5.0da4f","g":"400afb2f8fd43e75","mydb":"66829cd1.e354a4","sqlquery":"msg.topic","sql":"CREATE TABLE IF NOT EXISTS messages (\n id INTEGER PRIMARY KEY AUTOINCREMENT,\n topic TEXT,\n payload TEXT,\n timestamp DATETIME DEFAULT CURRENT_TIMESTAMP\n);","name":"Check for unsent messages","x":1040,"y":260,"wires":[["16df61560a4558c3","8756f4c9f441d703","f24ee3daf21307b5"]]},{"id":"8756f4c9f441d703","type":"split","z":"f8abdfa5.0da4f","g":"400afb2f8fd43e75","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","x":1322,"y":278,"wires":[["e2459324818bd95b","8131c503fa489881"]]},{"id":"e2459324818bd95b","type":"debug","z":"f8abdfa5.0da4f","g":"400afb2f8fd43e75","name":"debug 111","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":1502,"y":238,"wires":[]},{"id":"8131c503fa489881","type":"change","z":"f8abdfa5.0da4f","g":"400afb2f8fd43e75","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"payload.payload","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":1512,"y":278,"wires":[["c4d4891d.2716f"]]},{"id":"f24ee3daf21307b5","type":"template","z":"f8abdfa5.0da4f","g":"6a92c35bed3e664e","name":"","field":"topic","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"UPDATE messages \nSET mqttStatus = 'node-red:common.status.connected' \nWHERE mqttStatus != 'node-red:common.status.connected';\n","output":"str","x":1028,"y":442,"wires":[["60149157bd95319b"]]},{"id":"60149157bd95319b","type":"sqlite","z":"f8abdfa5.0da4f","g":"6a92c35bed3e664e","mydb":"66829cd1.e354a4","sqlquery":"msg.topic","sql":"CREATE TABLE IF NOT EXISTS messages (\n id INTEGER PRIMARY KEY AUTOINCREMENT,\n topic TEXT,\n payload TEXT,\n timestamp DATETIME DEFAULT CURRENT_TIMESTAMP\n);","name":"Update Message to status Sent","x":1258,"y":442,"wires":[["7c726d4ab7853449"]]},{"id":"7c726d4ab7853449","type":"debug","z":"f8abdfa5.0da4f","g":"6a92c35bed3e664e","name":"debug 112","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":1498,"y":442,"wires":[]},{"id":"d82a17b6b934cbc4","type":"switch","z":"f8abdfa5.0da4f","g":"569c965744f3f1d2","name":"","property":"payload","propertyType":"msg","rules":[{"t":"eq","v":"node-red:common.status.connected","vt":"str"}],"checkall":"true","repair":false,"outputs":1,"x":638,"y":282,"wires":[["05af3e1d555fcffa"]]},{"id":"66829cd1.e354a4","type":"sqlitedb","db":"test","mode":"RWC"},{"id":"9b99f9e0.924008","type":"mqtt-broker","name":"MQTT Broker","broker":"test.mosquitto.org","port":"1883","clientid":"node-red","autoConnect":true,"usetls":false,"compatmode":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"autoUnsubscribe":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""}]