Edge Extract Transform Load (ETL) Data Collection Pattern

This subflow handles the pushing of data to an HTTP out endpoint. It accepts JSON input (object or array) in the msg.payload, which is considered the data to be pushed. The flow is intended to be run as a service at the edge, serving as an intermediate data collector. The main goal is to ensure a robust and reliable process for data collection, that is not affected by network connectivity loss or other disturbing factors at the edge. The main logic of the flow appears in the following figure.

Pattern Images-Edge ETL Service drawio

The Node-RED input layer can be used to adapt to diverse protocols needed at the Edge IoT side. An example of an HTTP In local interface that then uses the Edge ETL subflow appears in the following figure. Of course any other type of Node-RED connector node can be used (AMQP, MQTT etc) for getting the data in the msg.payload.

usage

Then some relevant arbitrary preprocessing may be further performed at the inner flow, for minimizing the need to perform this later on the Cloud side, which may involve huge volumes of data coming from multiple sources. After that, the flow tries to push the data to the main Cloud storage service, applying the retry pattern. Details of these settings are set in the subflow UI.

etlUInode

If this is not successful after a number of retries, the data are stored locally in an sqlite DB (default location: /data/lost_data.db). Periodically, a relevant cron job is activated in order to check the lost data local db and retry again to push them centrally, while avoiding duplicate values insertion. The subflow internals appear in the following figure.

mainflow

The target POST endpoint can be configured in the node UI, as well as the number of retries before temporarily giving up on pushing. Credentials for the HTTP out node need to be set in the "pushout" node. A number of tester flows have also been created for local testing, including applying dummy connection details for simulating a network outage.

helperflows

If more than one instances of the subflow need to be deployed on a server (e.g. for pushing different data points to different endpoints), suitable modifications are needed since they would reuse the same db.

[{"id":"906076393d9542b2","type":"subflow","name":"Edge ETL Service to HTTP out","info":"This subflow handles the pushing of data to an HTTP out endpoint. It accepts JSON input (object or array) in the msg.payload, which is considered the data to be pushed.\n\nThe target POST endpoint can be configured in the node UI, as well as the number of retries before temporarily giving up on pushing. If that limit is reached, the data are stored in a local sqlite db, from which they are retrieved periodically (period set in the CRON JOB node of the subflow) and retried.\n\nCredentials for the HTTP out node need to be set in the \"pushout\" node.\n\nIf more than one instances of the subflow need to be deployed on a server (e.g. for pushing different data points to different endpoints), suitable modifications are needed since they would reuse the same db. So a different filename for a db should be used in each case. ","category":"PHYSICS","in":[{"x":260,"y":420,"wires":[{"id":"b2179ac409d6bf0f"}]}],"out":[],"env":[{"name":"retrylimit","type":"num","value":"5"},{"name":"targetUrl","type":"str","value":"http://10.100.59.183:1881/pushData"}],"meta":{},"color":"#C7E9C0"},{"id":"9070819b81e63623","type":"comment","z":"906076393d9542b2","name":"INPUT PHASE- Value in msg.payload","info":"This is specific to a given UC, and can be tailored to the way information is retrieved (i.e. IoT protocol). Node-RED has an abundance of relevant nodes and supported protocols that can be installed on demand through the \"Manage Palette\" option in the top right menu.","x":370,"y":340,"wires":[]},{"id":"79b86fc9ea03ebef","type":"comment","z":"906076393d9542b2","name":"EXAMPLE HTTP OUTPUT PHASE","info":"UC specific, depends on the outgoing system to which the data will be persisted. Node-RED has an abundance of relevant nodes, including database client nodes, cloud systems nodes, messaging system nodes etc that can be installed on demand through the \"Manage Palette\" option in the top right menu..","x":1200,"y":480,"wires":[]},{"id":"b3c059d63fbc22ed","type":"catch","z":"906076393d9542b2","name":"CATCHABLE ERROR","scope":["150e410b10193947"],"uncaught":false,"x":600,"y":680,"wires":[["3adde18d089bb01c"]]},{"id":"0b1148f8c5de31a5","type":"delay","z":"906076393d9542b2","name":"","pauseType":"delay","timeout":"2","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"x":1280,"y":640,"wires":[["1487b3bcf5479ead"]]},{"id":"3adde18d089bb01c","type":"function","z":"906076393d9542b2","name":"retry+1 & restore contents","func":"msg.thisretry=msg.thisretry+1;\nmsg.payload=msg.originalpayload;\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":870,"y":680,"wires":[["10a3e967af1bf5d5","1d706e16bbb158f4"]]},{"id":"10a3e967af1bf5d5","type":"switch","z":"906076393d9542b2","name":"Retry condition","property":"thisretry","propertyType":"msg","rules":[{"t":"lt","v":"retrylimit","vt":"env"},{"t":"gte","v":"retrylimit","vt":"env"}],"checkall":"true","repair":false,"outputs":2,"x":1100,"y":680,"wires":[["0b1148f8c5de31a5"],["55a4313698d81331"]]},{"id":"55a4313698d81331","type":"function","z":"906076393d9542b2","name":"insert to Data Loss DB","func":"\n\nif (msg.olddata) {\n    return [msg,null];\n} else {\n    msg.topic='INSERT INTO lost_data (timestamp, payload) values(\\\"'+msg.timestamp+'\\\",json_quote(\\''+msg.originalpayload+'\\'))';\n    //msg.topic='INSERT INTO lost_data (timestamp, payload) values(',${msg.timestamp},${msg.originalpayload});\n    //msg.topic = `INSERT INTO lost_data (timestamp, payload) VALUES (${msg.timestamp}, ${JSON.stringify(msg.originalpayload)});`\n    \n    //original: msg.topic='INSERT INTO lost_data (timestamp, payload) values(\\\"'+msg.timestamp+'\\\",\\\"'+msg.originalpayload+'\\\")';\n    return [null,msg];\n}\n//we could include another point in the table indicating the format of the data eg. JSON, XML etc that could be used as\n//arguments in further processing flows in other areas or in transformation from string to format\n//maybe this is needed in any case in this normal operation as well\n","outputs":2,"noerr":0,"initialize":"","finalize":"","libs":[],"x":1320,"y":720,"wires":[["40b24d90c2f3d8ff"],["68b464d4a6146c8e","96d0655bde3648ac"]]},{"id":"40b24d90c2f3d8ff","type":"debug","z":"906076393d9542b2","name":"ALREADY IN","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1550,"y":680,"wires":[]},{"id":"c2ce991733bf9dd0","type":"function","z":"906076393d9542b2","name":"Keep contents/retry def","func":"msg.thisretry=0;\n\nif (msg.olddata){\n    msg.timestamp=msg.payload.timestamp;\n    msg.originalpayload=msg.payload.payload;\n    msg.payload=msg.originalpayload;\n} else{\n    msg.timestamp=Date.now();\n    msg.originalpayload=msg.payload;\n}\n\nmsg.url=env.get('targetUrl');\n//msg.url='http://10.100.59.183:1881/pushData2'; //dummy fail\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":870,"y":540,"wires":[["1487b3bcf5479ead","e72d30206221babf"]]},{"id":"e65602605fa6eca1","type":"comment","z":"906076393d9542b2","name":"POINTS OF ATTENTION","info":"The retry field nees to be created as different with comparison to msg.retry, since the http node creates such a msg field in each request. Therefore the pre-existing field for retries in this flow will be overwritten by the http node.\n\nFurthermore, the original contents are in the msg.payload field of the incoming message at the beginning of the flow. In the http request node, the msg.payload contents are replaced by the response of the call, hence we need to ensure that we keep the contents of the initial input message that we need to store. These are moved in the \"Keep contents\" function in the msg.originalpayload field.\n\nOne final point of attention is that some nodes, like the http request node, do not throw a nodejs error event when the call fails, but instead the error is included in the msg. Therefore the catch node is not able to detect the failed call. For these nodes, the extra switch node of the http request node group can be used as an example of how  to detect per case the error condition. An extra step is the direct link between the switch node and the \"retry+1\" node, which is not needed in a typical catch case of a node that throws an nodejs error event on failure.\nMore details on error handling can be found here:\nhttps://nodered.org/docs/user-guide/handling-error\n\nAt the moment, deletion from the DB is based on msg.payload and msg.timestamp comparison, which implies that inside the payload there should be a unique identifier. However, typically this is the case in IoT related sensing, whereas the existence of timestamps of collection can also serve as a distinguishing factor between records with similar other fields.","x":830,"y":740,"wires":[]},{"id":"68b464d4a6146c8e","type":"sqlite","z":"906076393d9542b2","mydb":"2f1e96eb92bb0e38","sqlquery":"msg.topic","sql":"","name":"","x":1570,"y":720,"wires":[[]]},{"id":"51fc5b20a065efab","type":"function","z":"906076393d9542b2","name":"Delete from DB","func":"msg.payload=msg.originalpayload;\n\n//msg.topic='delete from lost_data where payload=\\''+msg.payload+'\\' and timestamp='+msg.timestamp;\nmsg.topic='delete from lost_data where timestamp='+msg.timestamp;\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":1540,"y":540,"wires":[["400e189822378c10"]]},{"id":"400e189822378c10","type":"sqlite","z":"906076393d9542b2","mydb":"2f1e96eb92bb0e38","sqlquery":"msg.topic","sql":"","name":"","x":1770,"y":540,"wires":[[]]},{"id":"0edab0b1878659e3","type":"function","z":"906076393d9542b2","g":"eda564e1ad76cd2a","name":"DROP TABLE","func":"\nmsg.topic='drop table lost_data';\n\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":500,"y":1260,"wires":[["174b2043633e01c1"]]},{"id":"8599633aa42b556f","type":"inject","z":"906076393d9542b2","g":"eda564e1ad76cd2a","name":"TEST","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":"","topic":"","payloadType":"str","x":350,"y":1260,"wires":[["0edab0b1878659e3"]]},{"id":"90f54b11a6a36c1e","type":"http in","z":"906076393d9542b2","g":"eda564e1ad76cd2a","name":"","url":"/pushData2","method":"post","upload":false,"swaggerDoc":"","x":640,"y":1120,"wires":[["15890f2b67539144","86799a5927841ee5","3417e97479b9ceeb"]]},{"id":"3417e97479b9ceeb","type":"debug","z":"906076393d9542b2","g":"eda564e1ad76cd2a","name":"RECEIVED CONTENTS","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":910,"y":1100,"wires":[]},{"id":"15890f2b67539144","type":"http response","z":"906076393d9542b2","g":"eda564e1ad76cd2a","name":"","statusCode":"","headers":{},"x":890,"y":1140,"wires":[]},{"id":"f17efed9a33810a8","type":"comment","z":"906076393d9542b2","g":"eda564e1ad76cd2a","name":"DUMMY TEST SERVICE","info":"","x":650,"y":1080,"wires":[]},{"id":"ab03aac9d7087ea9","type":"inject","z":"906076393d9542b2","g":"eda564e1ad76cd2a","name":"TEST","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":"","topic":"","payloadType":"str","x":350,"y":1180,"wires":[["5032719a4615b1f9"]]},{"id":"5032719a4615b1f9","type":"function","z":"906076393d9542b2","g":"eda564e1ad76cd2a","name":"SEE ALL ENTRIES","func":"\nmsg.topic='select * from lost_data';\n\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":510,"y":1180,"wires":[["174b2043633e01c1"]]},{"id":"174b2043633e01c1","type":"sqlite","z":"906076393d9542b2","g":"eda564e1ad76cd2a","mydb":"2f1e96eb92bb0e38","sqlquery":"msg.topic","sql":"","name":"","x":730,"y":1180,"wires":[["1de63f5e4c78da81"]]},{"id":"1de63f5e4c78da81","type":"debug","z":"906076393d9542b2","g":"eda564e1ad76cd2a","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":930,"y":1180,"wires":[]},{"id":"1487b3bcf5479ead","type":"http request","z":"906076393d9542b2","g":"b52a406dc55537d2","name":"pushout","method":"POST","ret":"txt","paytoqs":"ignore","url":"","tls":"ee95c4c29f5ef08c","persist":false,"proxy":"","authType":"","x":1100,"y":540,"wires":[["a04a8abf407a8267"]]},{"id":"a04a8abf407a8267","type":"switch","z":"906076393d9542b2","g":"b52a406dc55537d2","name":"Status code>=400","property":"statusCode","propertyType":"msg","rules":[{"t":"lt","v":"400","vt":"str"},{"t":"gte","v":"400","vt":"str"}],"checkall":"true","repair":false,"outputs":2,"x":1290,"y":540,"wires":[["51fc5b20a065efab"],["3adde18d089bb01c"]]},{"id":"77a88e7de4ef4dbf","type":"function","z":"906076393d9542b2","g":"2d0f7ba22222e26e","name":"SELECT ALL","func":"\nmsg.topic='select timestamp,payload from lost_data';\n\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":830,"y":160,"wires":[["40a926afd38394e1"]]},{"id":"40a926afd38394e1","type":"sqlite","z":"906076393d9542b2","g":"2d0f7ba22222e26e","mydb":"2f1e96eb92bb0e38","sqlquery":"msg.topic","sql":"","name":"","x":1030,"y":160,"wires":[["c8af65d51e21f4a0"]]},{"id":"363cffdd8ab3836b","type":"function","z":"906076393d9542b2","g":"2d0f7ba22222e26e","name":"SEND TO TYPICAL FLOW","func":"\nmsg.olddata=true;\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":980,"y":220,"wires":[["8d7414e20d8956c8"]]},{"id":"52b0e080b9593f76","type":"inject","z":"906076393d9542b2","g":"2d0f7ba22222e26e","name":"CRON JOB","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"120","crontab":"","once":false,"onceDelay":0.1,"topic":"","payloadType":"date","x":650,"y":160,"wires":[["77a88e7de4ef4dbf"]]},{"id":"c8af65d51e21f4a0","type":"split","z":"906076393d9542b2","g":"2d0f7ba22222e26e","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","x":1210,"y":160,"wires":[["363cffdd8ab3836b"]]},{"id":"bb06398443970437","type":"comment","z":"906076393d9542b2","g":"2d0f7ba22222e26e","name":"RETRY PAST FAILS","info":"","x":650,"y":200,"wires":[]},{"id":"1adb58484c2e1ff2","type":"inject","z":"906076393d9542b2","g":"37ae854edd52f2f2","name":"AUTO","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":true,"onceDelay":"","topic":"","payloadType":"str","x":490,"y":880,"wires":[["b09347b3ff6535ad"]]},{"id":"c6eba813af3aba40","type":"sqlite","z":"906076393d9542b2","g":"37ae854edd52f2f2","mydb":"2f1e96eb92bb0e38","sqlquery":"msg.topic","sql":"","name":"","x":910,"y":880,"wires":[[]]},{"id":"b09347b3ff6535ad","type":"function","z":"906076393d9542b2","g":"37ae854edd52f2f2","name":"CREATE DATA LOSS TABLE","func":"\nmsg.topic='create table lost_data (\\\nid integer primary key autoincrement,timestamp text,payload text,format text)';\n\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":680,"y":880,"wires":[["c6eba813af3aba40"]]},{"id":"e99955506f78d77a","type":"function","z":"906076393d9542b2","g":"37ae854edd52f2f2","name":"CONF","func":"//flow.set(\"retrylimit\",5);\n//flow.set(\"targetUrl\",\"http://10.100.59.183:1881/pushData\");\n\nflow.set(\"retrylimit\",env.get('retrylimit'));\nflow.set(\"targetUrl\",env.get('targetUrl'));\nmsg.payload={};\n\nmsg.payload.targetUrl=flow.get('targetUrl');\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":670,"y":800,"wires":[["e1b46463b11a0e8d"]]},{"id":"b102d82502849623","type":"inject","z":"906076393d9542b2","g":"37ae854edd52f2f2","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":true,"onceDelay":0.1,"topic":"","payloadType":"date","x":520,"y":800,"wires":[["e99955506f78d77a"]]},{"id":"8987e83b8b488f48","type":"inject","z":"906076393d9542b2","g":"f8a55e07e87c2cd7","name":"Normal Operation","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"\"MY PAYLOAD\"","payloadType":"str","x":380,"y":1380,"wires":[["af9791fe5b686855"]]},{"id":"af9791fe5b686855","type":"http request","z":"906076393d9542b2","g":"f8a55e07e87c2cd7","name":"callin","method":"POST","ret":"txt","paytoqs":"ignore","url":"http://10.100.59.183:1881/datain","tls":"","persist":false,"proxy":"","authType":"basic","x":570,"y":1380,"wires":[["12c5786e6fe413ed"]]},{"id":"12c5786e6fe413ed","type":"debug","z":"906076393d9542b2","g":"f8a55e07e87c2cd7","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":770,"y":1380,"wires":[]},{"id":"96d0655bde3648ac","type":"debug","z":"906076393d9542b2","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1470,"y":800,"wires":[]},{"id":"3761c8310330fb57","type":"inject","z":"906076393d9542b2","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":true,"onceDelay":0.1,"topic":"","payloadType":"date","x":330,"y":1460,"wires":[["2a7334edcfb11b7f"]]},{"id":"2a7334edcfb11b7f","type":"function","z":"906076393d9542b2","name":"CONF -WRONG","func":"\nflow.set(\"targetUrl\",\"http://10.100.59.183:1881/noapi\");\n\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":570,"y":1460,"wires":[["ce9a7e15d7d4773a"]]},{"id":"ce9a7e15d7d4773a","type":"debug","z":"906076393d9542b2","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":750,"y":1460,"wires":[]},{"id":"245276b1e8e474aa","type":"function","z":"906076393d9542b2","name":"CONF -CORRECT","func":"\nflow.set(\"targetUrl\",\"http://10.100.59.183:1881/pushdata\");\n\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":570,"y":1560,"wires":[[]]},{"id":"82de9423f3a95fac","type":"inject","z":"906076393d9542b2","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"30","crontab":"","once":true,"onceDelay":0.1,"topic":"","payloadType":"date","x":330,"y":1560,"wires":[["245276b1e8e474aa"]]},{"id":"86799a5927841ee5","type":"file","z":"906076393d9542b2","g":"eda564e1ad76cd2a","name":"","filename":"/data/receivedTimestamps","appendNewline":true,"createDir":false,"overwriteFile":"false","encoding":"none","x":1000,"y":1040,"wires":[[]]},{"id":"e1b46463b11a0e8d","type":"debug","z":"906076393d9542b2","g":"37ae854edd52f2f2","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":860,"y":800,"wires":[]},{"id":"e72d30206221babf","type":"debug","z":"906076393d9542b2","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":890,"y":600,"wires":[]},{"id":"b2179ac409d6bf0f","type":"json","z":"906076393d9542b2","name":"","property":"payload","action":"","pretty":false,"x":570,"y":540,"wires":[["c2ce991733bf9dd0"]]},{"id":"8d7414e20d8956c8","type":"json","z":"906076393d9542b2","g":"2d0f7ba22222e26e","name":"","property":"payload.payload","action":"","pretty":false,"x":930,"y":280,"wires":[["c2ce991733bf9dd0"]]},{"id":"1d706e16bbb158f4","type":"debug","z":"906076393d9542b2","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1110,"y":740,"wires":[]},{"id":"ee95c4c29f5ef08c","type":"tls-config","z":"906076393d9542b2","name":"","cert":"","key":"","ca":"","certname":"","keyname":"","caname":"","servername":"","verifyservercert":false,"alpnprotocol":""},{"id":"2f1e96eb92bb0e38","type":"sqlitedb","db":"/data/lost_data.db","mode":"RWC"},{"id":"52cfa0819dab2fa1","type":"function","z":"1fa9907a127de733","name":"Local Preprocessing","func":"\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":380,"y":220,"wires":[["70d56cee1da706bf"]]},{"id":"70d56cee1da706bf","type":"subflow:906076393d9542b2","z":"1fa9907a127de733","name":"","env":[],"x":650,"y":220,"wires":[]},{"id":"fb9c85a02f0bdb75","type":"group","z":"1fa9907a127de733","name":"HTTP POST Input Mode- Data in msg.payload","style":{"label":true},"nodes":["bfa099b4651eafcb","d626aa3066cc8ff7"],"x":194,"y":59,"w":432,"h":122},{"id":"bfa099b4651eafcb","type":"http in","z":"1fa9907a127de733","g":"fb9c85a02f0bdb75","name":"","url":"/datain2","method":"post","upload":false,"swaggerDoc":"","x":290,"y":140,"wires":[["d626aa3066cc8ff7","52cfa0819dab2fa1"]]},{"id":"d626aa3066cc8ff7","type":"http response","z":"1fa9907a127de733","g":"fb9c85a02f0bdb75","name":"","statusCode":"","headers":{},"x":550,"y":100,"wires":[]}]

Flow Info

Created 1 year, 11 months ago
Rating: not yet rated

Owner

Actions

Rate:

Node Types

Core
  • catch (x1)
  • comment (x5)
  • debug (x9)
  • delay (x1)
  • file (x1)
  • function (x13)
  • http in (x2)
  • http request (x2)
  • http response (x2)
  • inject (x8)
  • json (x2)
  • split (x1)
  • switch (x2)
  • tls-config (x1)
Other

Tags

  • Edge
  • IoT
  • data
  • collection
  • reliable
Copy this flow JSON to your clipboard and then import into Node-RED using the Import From > Clipboard (Ctrl-I) menu option