Kafka + ODBC
JDubbTX example flow
Use this flow to
- Query data from an ODBC data source and publish it to Kafka
- Consume the same message from Kafka and do an insert to a table, again using ODBC
[{"id":"22e214a787de4a4a","type":"tab","label":"ODBC and Kafka Demo","disabled":false,"info":"","env":[]},{"id":"55ebd1310b11aa56","type":"consumer","z":"22e214a787de4a4a","name":"","broker":"1e4aef0f42a89f55","outOfRangeOffset":"earliest","fromOffset":"latest","topic":"customers","groupid":"Nodered-group1","encoding":"utf8","x":140,"y":280,"wires":[["5ec160616cab3b0c"]]},{"id":"3da38cb48d778353","type":"debug","z":"22e214a787de4a4a","name":"debug 2","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":720,"y":240,"wires":[]},{"id":"022228e27702e002","type":"ODBC","z":"22e214a787de4a4a","connection":"8b6a9780ca9bcc1e","name":"","query":"insert into customer_ddl_table2 values ( \n {{{payload.CUSTOMER_ID}}},\n '{{{payload.CUSTOMER_NAME_LAST}}}',\n '{{{payload.CUSTOMER_NAME_FIRST}}}',\n '{{{payload.CUSTOMER_NAME_MI}}}' \n)","outField":"payload","x":710,"y":280,"wires":[["d2208751692f0a0a"]]},{"id":"d2208751692f0a0a","type":"debug","z":"22e214a787de4a4a","name":"debug 3","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":840,"y":280,"wires":[]},{"id":"5ec160616cab3b0c","type":"function","z":"22e214a787de4a4a","name":"function: Get Kafka Value Array","func":"// This function recieves a kafka message with JSON value \n// and returns that value without the extra kafka metadata \n \n // Check if msg.payload and msg.payload.value are defined\n if (msg.payload && msg.payload.value) {\n\n try {\n // Extract values from the JSON string in msg.payload.value\n var payloadArray = JSON.parse(msg.payload.value);\n\n // Ensure payloadArray is an array and not null/undefined\n if (Array.isArray(payloadArray) && payloadArray.length > 0) { \n\n // Set returned payload the parsed array of objects\n msg.payload = payloadArray;\n\n // Return the message\n return msg;\n } else {\n console.error(\"Error: Invalid array in msg.payload.value\");\n }\n } catch (error) {\n console.error(\"Error parsing JSON:\", error);\n }\n } else {\n console.error(\"Error: msg.payload.value is undefined or null\");\n }","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":370,"y":280,"wires":[["bde262f5788ab5a9"]]},{"id":"bde262f5788ab5a9","type":"split","z":"22e214a787de4a4a","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","x":570,"y":280,"wires":[["3da38cb48d778353","022228e27702e002"]]},{"id":"63a08780ac9911a5","type":"inject","z":"22e214a787de4a4a","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"501","payloadType":"num","x":110,"y":140,"wires":[["1503670f081270cf"]]},{"id":"1503670f081270cf","type":"ODBC","z":"22e214a787de4a4a","connection":"8b6a9780ca9bcc1e","name":"","query":"select * from customer\nwhere customer_id = {{{ payload }}}\n","outField":"payload","x":230,"y":140,"wires":[["920254e237b987be"]]},{"id":"920254e237b987be","type":"json","z":"22e214a787de4a4a","name":"","property":"payload","action":"","pretty":false,"x":350,"y":140,"wires":[["a60904a3ecd4f5ab"]]},{"id":"a60904a3ecd4f5ab","type":"producer","z":"22e214a787de4a4a","name":"","broker":"1e4aef0f42a89f55","topic":"customers","requireAcks":1,"ackTimeoutMs":100,"attributes":0,"x":500,"y":140,"wires":[]},{"id":"9ef0018f59ef3c83","type":"comment","z":"22e214a787de4a4a","name":"Kafka ODBC Producer","info":"# Kafak ODBC Producer v1","x":160,"y":100,"wires":[]},{"id":"4bfd6da0f764530e","type":"comment","z":"22e214a787de4a4a","name":"Kafka ODBC Consumer","info":"","x":160,"y":240,"wires":[]},{"id":"1e4aef0f42a89f55","type":"kafka-broker","name":"","hosts":"localhost:29092","usesasl":false,"username":"","password":"","usetls":false,"cacert":"","clientcert":"","privatekey":"","passphrase":"","selfsign":false},{"id":"8b6a9780ca9bcc1e","type":"ODBC_CONNECTION","connection":"DSN=PUB400"}]