Use DuckDB Function Node to Process Data
This example shows a flow to transform data using duckdb sql nodes and duckdb function node.
[{"id":"4c80175cd8069f06","type":"tab","label":"Flow 1","disabled":false,"info":"","env":[]},{"id":"7a9768642dd4f58b","type":"inject","z":"4c80175cd8069f06","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":120,"y":240,"wires":[["be529dee6c636efe"]]},{"id":"be529dee6c636efe","type":"duckdb-sql","z":"4c80175cd8069f06","mydb":"1530b6327e89473f","sqlquery":"exec","sql":"create table if not exists test (id int, rm varchar(10), name varchar(255));\ninsert into test values(1, 'test1', 'name1');\ninsert into test values(2, 'test2', 'name2');\ninsert into test values(3, 'test3', 'name3');\ninsert into test values(4, 'test4', 'name4');","name":"insert","x":270,"y":240,"wires":[["c8fc2c56b140f72b"]]},{"id":"c8fc2c56b140f72b","type":"duckdb func","z":"4c80175cd8069f06","name":"test proc","mydb":"1530b6327e89473f","duckdbfuncbatchsize":"100","duckdbfunc":"msg.beforeProc = \"CREATE TABLE IF NOT EXISTS after(id int, name varchar(255));\"\nmsg.procQuery = \"SELECT * FROM test\";\nmsg.proc = function(row) {\n // transform row from proc query\n delete row['rm'];\n // return insert to new table\n return \"INSERT INTO after VALUES(\" + JSON.stringify(Object.values(row)).slice(1, -1).replaceAll('\"', '\\'') + \");\";\n}\nmsg.afterProc = \"SELECT * FROM after LIMIT 100;\";","outputs":1,"noerr":0,"libs":[],"x":420,"y":240,"wires":[["6be5bdb149a437b8","03775b2a03dabe7e"]]},{"id":"6be5bdb149a437b8","type":"table-viewer","z":"4c80175cd8069f06","name":"sql output","property":"payload","fieldType":"msg","width":200,"height":160,"rows":"100","active":true,"outputs":0,"x":420,"y":280,"wires":[]},{"id":"03775b2a03dabe7e","type":"duckdb func","z":"4c80175cd8069f06","name":"convert","mydb":"1530b6327e89473f","duckdbfuncbatchsize":100,"duckdbfunc":"msg.beforeProc = \"CREATE TABLE proc(nodeId varchar(32), data json);\"\nmsg.procQuery = \"SELECT * FROM after\";\nmsg.proc = function(row) {\n // transform row from proc query\n var nodeId = node.id;\n var data = row;\n // return insert to new table\n return \"INSERT INTO proc VALUES('\" + nodeId + \"', '\" + JSON.stringify(data).replaceAll('\"', '\\\"') + \"');\";\n}\nmsg.nodeId = node.id;\nmsg.afterProc = \"SELECT * FROM proc where nodeId = '\" + node.id + \"' LIMIT 10;\";","outputs":1,"noerr":0,"libs":[],"x":660,"y":240,"wires":[["3e0bc2e5e53e65e2","76638ad795fe7149"]]},{"id":"3e0bc2e5e53e65e2","type":"table-viewer","z":"4c80175cd8069f06","name":"convert","property":"payload","fieldType":"msg","width":200,"height":160,"rows":10,"active":true,"outputs":0,"x":660,"y":280,"wires":[]},{"id":"76638ad795fe7149","type":"duckdb func","z":"4c80175cd8069f06","name":"add random","mydb":"1530b6327e89473f","duckdbfuncbatchsize":100,"duckdbfunc":"msg.procQuery = \"SELECT * FROM proc where nodeId = '\" + msg.nodeId + \"'\" ;\nmsg.proc = function(row) {\n // transform row from proc query\n var nodeId = node.id;\n var data = JSON.parse(row['data']);\n data['random'] = Math.random();\n // return insert to new table\n return \"INSERT INTO proc VALUES('\" + nodeId + \"', '\" + JSON.stringify(data).replaceAll('\"', '\\\"') + \"');\";\n}\nmsg.nodeId = node.id;\nmsg.afterProc = \"SELECT * FROM proc where nodeId = '\" + node.id + \"' LIMIT 10;\";","outputs":1,"noerr":0,"libs":[],"x":910,"y":240,"wires":[["b7d3459fbe6a1e27","0b99e38dd6dcb94b"]]},{"id":"b7d3459fbe6a1e27","type":"table-viewer","z":"4c80175cd8069f06","name":"","property":"payload","fieldType":"msg","width":200,"height":160,"rows":10,"active":true,"outputs":0,"x":910,"y":280,"wires":[]},{"id":"0b99e38dd6dcb94b","type":"duckdb func","z":"4c80175cd8069f06","name":"to table","mydb":"1530b6327e89473f","duckdbfuncbatchsize":100,"duckdbfunc":"msg.beforeProc = \"CREATE TABLE final(id int, name varchar(255), random float);\"\nmsg.procQuery = \"SELECT * FROM proc where nodeId = '\" + msg.nodeId + \"'\";\nmsg.proc = function(row) {\n // transform row from proc query\n var data = JSON.parse(row['data']);\n \n // return insert to new table\n return \"INSERT INTO final VALUES(\" + JSON.stringify(Object.values(data)).slice(1, -1).replaceAll('\"', '\\'') + \");\";\n}\n\nmsg.afterProc = \"SELECT * FROM final LIMIT 10;\";","outputs":1,"noerr":0,"libs":[],"x":1120,"y":240,"wires":[["0194addb780c78f0"]]},{"id":"0194addb780c78f0","type":"table-viewer","z":"4c80175cd8069f06","name":"","property":"payload","fieldType":"msg","width":200,"height":160,"rows":10,"active":true,"outputs":0,"x":1130,"y":280,"wires":[]},{"id":"1530b6327e89473f","type":"duckdb","db":":memory:"}]