node-red-contrib-mqtt-tdengine 1.0.3
A Node-RED node for writing MQTT data to TDengine database with batch insert support and enhanced error handling
Node-RED MQTT to TDengine
A Node-RED custom node for writing MQTT message data to TDengine time-series database. Uses the official TDengine WebSocket connector for efficient data transmission with batch insert support.
Features
- ๐ MQTT Subscription: Subscribe to MQTT topics and receive real-time data
- ๐ TDengine Integration: Uses official
@tdengine/websocketconnector, supports local and cloud TDengine - โก Batch Insert: High-performance batch insert mode with configurable batch size and timeout
- ๐ฏ Smart Variable Replacement: Supports JSON field extraction from MQTT payloads
- โ๏ธ Flexible Configuration: Custom SQL templates with variable substitution
- ๐ Real-time Status: Connection status and data processing results display
- ๐ก๏ธ Enhanced Error Handling: Comprehensive error handling with retry mechanisms and fallback strategies
Installation
Method 1: Install via Node-RED Palette Manager
- Open Node-RED management interface
- Click the menu in the top right corner โ Manage palette
- Select the "Install" tab
- Search for
node-red-contrib-mqtt-tdengine - Click Install
Method 2: Install via npm
cd ~/.node-red
npm install node-red-contrib-mqtt-tdengine
Configuration
MQTT Configuration
- MQTT Server: MQTT broker server address, e.g.,
mqtt://localhost:1883 - Topic: MQTT topic to subscribe to, e.g.,
sensor/temperature - QoS: Message quality level (0, 1, 2)
TDengine Configuration
- WebSocket URL: TDengine WebSocket connection address
- Local deployment:
ws://localhost:6041 - TDengine Cloud:
wss://your-instance.cloud.tdengine.com
- Local deployment:
- Database: TDengine database name
- Table: Target data table name
Batch Insert Configuration
- Enable Batch: Toggle batch insert mode for better performance
- Batch Size: Number of records to batch together (1-1000, default: 10)
- Batch Timeout: Maximum time to wait before executing batch (1-60 seconds, default: 1)
SQL Template Configuration
SQL templates support the following variable substitutions:
${payload}: Complete MQTT message content${topic}: MQTT topic${table}: Configured table name${database}: Configured database name${fieldname}: JSON field values from payload (e.g.,${co2},${pm25})
Example Templates:
Single insert mode:
INSERT INTO ${table} (createtime, co2, pm25) VALUES (NOW, ${co2}, ${pm25})
For JSON payload like: {"co2": 400, "pm25": 35}
Usage Examples
1. Basic Configuration
- Drag the
mqtt-tdenginenode to the workspace - Double-click the node to configure:
- MQTT Server:
mqtt://localhost:1883 - Topic:
sensor/data - TDengine WebSocket URL:
ws://localhost:6041 - Database:
iot_data - Table:
air_sensor_001 - SQL Template:
INSERT INTO ${table} (createtime, co2, pm25) VALUES (NOW, ${co2}, ${pm25})
- MQTT Server:
2. Batch Insert Mode
Enable batch insert for high-throughput scenarios:
- Enable Batch: โ
- Batch Size: 100
- Batch Timeout: 5 seconds
This will collect up to 100 records or wait 5 seconds before executing a batch insert.
3. Performance Comparison
Based on testing:
- Single Insert: ~100 records/second
- Batch Insert: ~14,700 records/second (147x improvement)
- Batch Processing: 1000 records in 68ms
TDengine Table Structure Example
-- Create database
CREATE DATABASE IF NOT EXISTS iot_data;
-- Use database
USE iot_data;
-- Create table for air sensor data
CREATE TABLE IF NOT EXISTS air_sensor_001 (
createtime TIMESTAMP,
co2 INT,
pm25 INT
);
-- Create super table for multiple sensors
CREATE STABLE IF NOT EXISTS air_sensors (
createtime TIMESTAMP,
co2 INT,
pm25 INT,
temperature FLOAT,
humidity FLOAT
) TAGS (
device_id NCHAR(32),
location NCHAR(64)
);
Output Message Format
The node outputs messages containing execution results:
Success (Single Insert)
{
"payload": {
"success": true,
"message": "ๆฐๆฎๆๅ
ฅๆๅ",
"sql": "INSERT INTO air_sensor_001 (createtime, co2, pm25) VALUES (NOW, 400, 35)",
"result": {...},
"originalTopic": "sensor/air",
"originalPayload": "{\"co2\": 400, \"pm25\": 35}"
}
}
Success (Batch Insert)
{
"payload": {
"success": true,
"message": "ๆน้ๆๅ
ฅๆๅ: 100 ๆกๆฐๆฎ",
"table": "air_sensor_001",
"count": 100,
"result": {...}
}
}
Error
{
"payload": {
"success": false,
"error": "TDengineๆง่กๅคฑ่ดฅ: Invalid SQL",
"sql": "INSERT INTO air_sensor_001 VALUES (NOW, 400, 35)",
"originalTopic": "sensor/air",
"originalPayload": "{\"co2\": 400, \"pm25\": 35}"
}
}
Troubleshooting
Common Issues
MQTT Connection Failed
- Check MQTT server address and port
- Verify network connectivity
- Check firewall settings
TDengine Connection Failed
- Ensure TDengine service is running
- Check WebSocket port (default 6041)
- Verify username and password
SQL Execution Failed
- Check if database and table exist
- Verify SQL syntax correctness
- Ensure data type matching
Variable Replacement Issues
- Ensure MQTT payload is valid JSON for field extraction
- Check variable names match JSON field names exactly
- Use debug node to inspect actual payload content
Debugging Methods
- Check Node-RED debug panel log output
- Monitor node status indicators
- Test SQL statements directly with TDengine client
- Use the included test scripts to verify functionality
Dependencies
- Node.js: >= 14.0.0
- Node-RED: >= 2.0.0
- @tdengine/websocket: ^3.1.0
- mqtt: ^4.3.7
- axios: ^1.12.2
License
MIT License
Contributing
Issues and Pull Requests are welcome to improve this project.
Changelog
v1.0.0
- ๐ Initial release
- ๐ก MQTT subscription and TDengine integration
- โก Batch insert support with configurable parameters
- ๐ฏ Smart JSON field variable replacement
- ๐ก๏ธ Enhanced error handling and retry mechanisms
- ๐ Automatic connection management and recovery
- ๐ Real-time status monitoring
- ๐งช Comprehensive test coverage