Kafka + ODBC

JDubbTX example flow

Use this flow to

  1. Query data from an ODBC data source and publish it to Kafka
  2. Consume the same message from Kafka and do an insert to a table, again using ODBC
[{"id":"22e214a787de4a4a","type":"tab","label":"ODBC and Kafka Demo","disabled":false,"info":"","env":[]},{"id":"55ebd1310b11aa56","type":"consumer","z":"22e214a787de4a4a","name":"","broker":"1e4aef0f42a89f55","outOfRangeOffset":"earliest","fromOffset":"latest","topic":"customers","groupid":"Nodered-group1","encoding":"utf8","x":140,"y":280,"wires":[["5ec160616cab3b0c"]]},{"id":"3da38cb48d778353","type":"debug","z":"22e214a787de4a4a","name":"debug 2","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":720,"y":240,"wires":[]},{"id":"022228e27702e002","type":"ODBC","z":"22e214a787de4a4a","connection":"8b6a9780ca9bcc1e","name":"","query":"insert into customer_ddl_table2 values ( \n    {{{payload.CUSTOMER_ID}}},\n    '{{{payload.CUSTOMER_NAME_LAST}}}',\n    '{{{payload.CUSTOMER_NAME_FIRST}}}',\n    '{{{payload.CUSTOMER_NAME_MI}}}' \n)","outField":"payload","x":710,"y":280,"wires":[["d2208751692f0a0a"]]},{"id":"d2208751692f0a0a","type":"debug","z":"22e214a787de4a4a","name":"debug 3","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":840,"y":280,"wires":[]},{"id":"5ec160616cab3b0c","type":"function","z":"22e214a787de4a4a","name":"function:  Get Kafka Value Array","func":"// This function recieves a kafka message with JSON value \n// and returns that value without the extra kafka metadata   \n  \n  // Check if msg.payload and msg.payload.value are defined\n  if (msg.payload && msg.payload.value) {\n\n    try {\n      // Extract values from the JSON string in msg.payload.value\n      var payloadArray = JSON.parse(msg.payload.value);\n\n      // Ensure payloadArray is an array and not null/undefined\n      if (Array.isArray(payloadArray) && payloadArray.length > 0) {        \n\n        // Set returned payload the parsed array of objects\n        msg.payload = payloadArray;\n\n        // Return the message\n        return msg;\n      } else {\n        console.error(\"Error: Invalid array in msg.payload.value\");\n      }\n    } catch (error) {\n      console.error(\"Error parsing JSON:\", error);\n    }\n  } else {\n    console.error(\"Error: msg.payload.value is undefined or null\");\n  }","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":370,"y":280,"wires":[["bde262f5788ab5a9"]]},{"id":"bde262f5788ab5a9","type":"split","z":"22e214a787de4a4a","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","x":570,"y":280,"wires":[["3da38cb48d778353","022228e27702e002"]]},{"id":"63a08780ac9911a5","type":"inject","z":"22e214a787de4a4a","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"501","payloadType":"num","x":110,"y":140,"wires":[["1503670f081270cf"]]},{"id":"1503670f081270cf","type":"ODBC","z":"22e214a787de4a4a","connection":"8b6a9780ca9bcc1e","name":"","query":"select * from customer\nwhere customer_id = {{{ payload }}}\n","outField":"payload","x":230,"y":140,"wires":[["920254e237b987be"]]},{"id":"920254e237b987be","type":"json","z":"22e214a787de4a4a","name":"","property":"payload","action":"","pretty":false,"x":350,"y":140,"wires":[["a60904a3ecd4f5ab"]]},{"id":"a60904a3ecd4f5ab","type":"producer","z":"22e214a787de4a4a","name":"","broker":"1e4aef0f42a89f55","topic":"customers","requireAcks":1,"ackTimeoutMs":100,"attributes":0,"x":500,"y":140,"wires":[]},{"id":"9ef0018f59ef3c83","type":"comment","z":"22e214a787de4a4a","name":"Kafka ODBC Producer","info":"# Kafak ODBC Producer v1","x":160,"y":100,"wires":[]},{"id":"4bfd6da0f764530e","type":"comment","z":"22e214a787de4a4a","name":"Kafka ODBC Consumer","info":"","x":160,"y":240,"wires":[]},{"id":"1e4aef0f42a89f55","type":"kafka-broker","name":"","hosts":"localhost:29092","usesasl":false,"username":"","password":"","usetls":false,"cacert":"","clientcert":"","privatekey":"","passphrase":"","selfsign":false},{"id":"8b6a9780ca9bcc1e","type":"ODBC_CONNECTION","connection":"DSN=PUB400"}]

Flow Info

Created 1 year, 6 months ago
Rating: 4 1

Owner

Actions

Rate:

Node Types

Core
  • comment (x2)
  • debug (x2)
  • function (x1)
  • inject (x1)
  • json (x1)
  • split (x1)
Other
  • ODBC (x2)
  • ODBC_CONNECTION (x1)
  • consumer (x1)
  • kafka-broker (x1)
  • producer (x1)
  • tab (x1)

Tags

  • ODBC
  • Kafka
Copy this flow JSON to your clipboard and then import into Node-RED using the Import From > Clipboard (Ctrl-I) menu option