node-red-contrib-saxe-edge 2.0.2
Saxe Edge - MQTT batch processing with offline buffering for Node-RED
node-red-contrib-saxe-edge
Saxe Edge - MQTT batch processing with offline buffering for Node-RED.
This plugin provides a configuration node and a buffer node for implementing the Saxe Edge ST1 protocol: buffered sensor data collection, GPS aggregation, batch processing, and reliable MQTT uplink with offline resilience.
Features
- Offline Buffering: SQLite-based outbox for resilient data storage during network outages
- Batch Processing: Automatic batching with 3-limit enforcement (records, bytes, time span)
- Configurable Batch Interval: Adjustable batch send interval (1-300 seconds, default 5)
- Cross-Metric Batching: All metrics for a device are batched together for efficient compression
- GPS Aggregation: Combines separate latitude/longitude readings into atomic ST1 GPS format
- Poison Record Handling: Isolates invalid records to dead_letter table
- Sequence Tracking: Monotonic sequence numbers per device for cloud-side deduplication
- MQTT QoS 1: Reliable delivery with PUBACK confirmation before record deletion
- Edge Presence: Automatic online/offline status via MQTT birth/LWT messages
Installation
Via Node-RED Palette Manager (Recommended)
- Open Node-RED editor
- Click the menu (☰) → Manage palette
- Go to the "Install" tab
- Search for
node-red-contrib-saxe-edge - Click "Install"
Via npm
cd ~/.node-red
npm install node-red-contrib-saxe-edge
Restart Node-RED after installation.
Quick Start
1. Create Configuration Node
- Drag a saxe-edge-buffer node onto your flow
- Double-click it and click the pencil icon to add a new
saxe-edge-config - Configure:
- Organization: Your org identifier (e.g.,
acme_corp) - Site: Your site identifier (e.g.,
factory_oslo) - MQTT Broker: Broker URL (e.g.,
wss://mqtt.example.com:443ormqtt://localhost:1883) - MQTT User/Password: Credentials for broker authentication
- Batch Size: Max records per batch cycle (default: 100)
- Batch Interval: Seconds between batch sends (default: 5, range: 1-300)
- Database Path: SQLite database location (default:
/edge-data/outbox.db)
- Organization: Your org identifier (e.g.,
- Click "Add" or "Update"
The config node will automatically:
- Generate a unique Edge UUID (persisted to
edge-uuid.txt) - Set global context variables for use in flows
- Publish online/offline presence to
ST1/edge/{org}/{site}/{edge_uuid}/online
2. Connect Your Sensors
Send sensor data to the buffer node with the following message format:
msg.payload = {
value: 23.5,
unit: "°C"
};
msg.subsystem = "line1/zoneA"; // Optional path (e.g., "area/cell")
msg.device = "PLC-01"; // Device identifier
msg.metric = "temperature"; // Metric name
Device status example:
msg.payload = {
status: "online",
ts: new Date().toISOString()
};
msg.domain = "devices";
msg.device = "PLC-01";
msg.metric = "status";
GPS example (send latitude + longitude as separate messages):
msg.payload = { value: 59.9139, unit: "deg" };
msg.device = "gps";
msg.metric = "latitude"; // or "longitude"
The buffer node will:
- Transform to ST1 format
- Buffer in SQLite outbox
- Batch on a configurable interval (default: every 5 seconds)
- Group all metrics per device into a single batch message
- Publish to MQTT with QoS 1
- Delete records after PUBACK confirmation
Configuration
Config Node Properties
| Property | Type | Required | Default | Description |
|---|---|---|---|---|
| Organization | string | Yes | - | Organization identifier for MQTT topic |
| Site | string | Yes | - | Site identifier for MQTT topic |
| MQTT Broker | string | Yes | - | Broker URL (mqtt://, mqtts://, ws://, wss://) |
| MQTT User | string | No | - | Optional username for broker authentication |
| MQTT Password | string | No | - | Optional password for broker authentication |
| Batch Size | number | No | 100 | Max records to claim per batch cycle |
| Batch Interval | number | No | 5 | Seconds between batch sends (range: 1-300) |
| Database Path | string | Yes | /edge-data/outbox.db | Path to SQLite database file |
Global Context Variables
The config node sets these global context variables for use in flows:
EDGE_UUID: Unique edge device identifierORG: Organization identifierSITE: Site identifierMQTT_BROKER: MQTT broker URLMQTT_USE_TLS: Boolean, true for mqtts:// or wss://BATCH_SIZE: Max records per batchBATCH_INTERVAL: Seconds between batch sendsDB_PATH: Database file path
Access in function nodes:
const edgeUuid = global.get('EDGE_UUID');
const org = global.get('ORG');
const site = global.get('SITE');
const batchInterval = global.get('BATCH_INTERVAL');
Database Schema
The plugin creates three tables:
outbox - Buffered Records
| Column | Type | Description |
|---|---|---|
| id | INTEGER PK | Auto-increment ID |
| topic | TEXT | MQTT topic |
| payload | TEXT | JSON payload |
| created_at | TEXT | ISO 8601 timestamp |
| sent | INTEGER | 0=pending, 1=sent, 2=inflight |
Indexes: (sent, id), (created_at)
seq_state - Sequence Tracking
| Column | Type | Description |
|---|---|---|
| group_key | TEXT PK | Unique per {org}/{site}/{subsystem}/{device} |
| seq | INTEGER | Last sequence number used |
dead_letter - Poison Records
| Column | Type | Description |
|---|---|---|
| id | INTEGER PK | Auto-increment ID |
| original_id | INTEGER | Original outbox record ID |
| topic | TEXT | Original MQTT topic |
| payload | TEXT | Original JSON payload |
| error | TEXT | Error description |
| created_at | TEXT | Original creation timestamp |
| moved_at | TEXT | Timestamp when moved to dead_letter |
MQTT Topic Structure
Sensor Data Batch
ST1/batch/sensors/{org}/{site}/{subsystem_path?}/{device}
Example: ST1/batch/sensors/acme_corp/factory_oslo/line1/zoneA/PLC-01
Device Status Batch
ST1/batch/devices/{org}/{site}/{subsystem_path?}/{device}/status
Example: ST1/batch/devices/acme_corp/factory_oslo/line1/zoneA/PLC-01/status
GPS Batch
ST1/batch/sensors/{org}/{site}/gps
Example: ST1/batch/sensors/acme_corp/factory_oslo/gps
Edge Presence
ST1/edge/{org}/{site}/{edge_uuid}/online
Presence payload includes {online, ts, edge_uuid, org, site}.
Batch Payload Format (ST1 v7)
Sensor Data
A sensor batch groups all metrics for a single device. The records array can contain different metrics (e.g., temperature, pressure, vibration) in the same batch.
{
"v": 1,
"org": "acme_corp",
"site": "factory_oslo",
"subsystem": "line1/zoneA",
"device": "PLC-01",
"edge_uuid": "87a70b51-b370-427c-bc7e-06ee62dcea61",
"from": "2026-01-08T10:00:00Z",
"to": "2026-01-08T10:00:05Z",
"seq": 42,
"records": [
{
"ts": "2026-01-08T10:00:01Z",
"metric": "temperature",
"value": 23.5,
"unit": "°C"
},
{
"ts": "2026-01-08T10:00:01Z",
"metric": "pressure",
"value": 1.013,
"unit": "bar"
},
{
"ts": "2026-01-08T10:00:03Z",
"metric": "temperature",
"value": 23.6,
"unit": "°C"
}
]
}
Note: edge_uuid is automatically included in all batch payloads. This enables the cloud to track which edge device sent the data, supporting scenarios with multiple edge devices per site.
GPS Data
{
"v": 1,
"org": "acme_corp",
"site": "factory_oslo",
"edge_uuid": "87a70b51-b370-427c-bc7e-06ee62dcea61",
"seq": 12,
"records": [
{
"ts": "2026-01-08T10:00:00Z",
"lat": 59.9139,
"lon": 10.7522
}
]
}
Batch Processing Rules
Batches are closed when ANY of these limits is reached:
- Max Records: 5000 records per batch
- Max Bytes: 1 MB payload size
- Max Time Span: 5 seconds between first and last record
Batch interval is configurable (default: 5 seconds, range: 1-300 seconds). Higher intervals collect more records per batch for better compression but increase latency.
Grouping Rules
| Data Type | Grouping Key | Result |
|---|---|---|
| Sensor | {domain, org, site, subsystem, device} |
All metrics for one device in one batch |
| Device status | {domain, org, site, subsystem, device} |
Status records per device |
| GPS | {domain, org, site} |
All GPS readings for the site |
Troubleshooting
Database Permission Errors
If you see "SQLITE_READONLY" errors:
# Ensure directory exists and is writable
mkdir -p /edge-data
chmod 755 /edge-data
# If database file exists with wrong permissions
sudo chown $(whoami):$(whoami) /edge-data/outbox.db
chmod 644 /edge-data/outbox.db
MQTT Connection Failures
- Check broker URL format: Must include protocol (mqtt://, mqtts://, ws://, wss://)
- Verify credentials: Ensure username/password are correct
- Test connectivity: Use mosquitto_pub to verify broker is reachable
- Check TLS: For mqtts:// or wss://, ensure certificates are valid
Batches Not Sending
- Check outbox: Query database to see if records are accumulating
sqlite3 /edge-data/outbox.db "SELECT COUNT(*) FROM outbox WHERE sent = 0;" - Check debug output: Add a debug node after sensors to verify messages arrive
- Verify MQTT connection: Check if config node shows "connected" in Node-RED log
- Check batch interval: The internal timer runs at the configured batch interval (default: 5 seconds)
Docker Deployment
For containerized deployments:
services:
nodered:
image: nodered/node-red:latest
volumes:
- nodered-data:/data
- ./edge-data:/edge-data
environment:
- ORG=acme_corp
- SITE=factory_oslo
- MQTT_BROKER=wss://mqtt.example.com:443
- MQTT_USER=edge_user
- MQTT_PASSWORD=secret
- BATCH_SIZE=100
- BATCH_INTERVAL=10
All configuration properties support Node-RED's ${ENV_VAR} syntax for environment variable substitution.
Architecture
Data flow through the buffer node:
- Sensors (Modbus, GPS, etc.) send messages to the buffer node
- Transform -- incoming data is converted to ST1 format; GPS lat/lon are aggregated into atomic
{ts, lat, lon} - SQLite Outbox -- records are appended to the outbox table for offline resilience
- Batch Cycle -- a configurable timer (default: 5s) triggers batch processing
- Claim Records -- atomic
UPDATE...RETURNINGclaims pending records - Build Batches -- records are grouped by device; all metrics for one device go into one batch; 3-limit enforcement (records, bytes, time span)
- Poison Handling -- invalid records are moved to
dead_lettertable - Sequence Assignment -- monotonic
seqnumber per device viaUPSERT - Format ST1 -- final JSON batch payload is built
- MQTT Publish -- batch is published with QoS 1
- PUBACK -- broker confirms receipt
- Delete Records -- confirmed records are removed from outbox
License
Saxe Noncommercial No-Derivatives License. See LICENSE. Commercial use is permitted only for Saxe and Saxe Customers as defined in the license. Noncommercial users may use and distribute unmodified copies only.
Support
- npm: https://www.npmjs.com/package/node-red-contrib-saxe-edge
- Specification: See ST1_MQTT_SPEC_v7.md for ST1 protocol details
Changelog
See CHANGELOG.md for version history.
Saxe Edge - Reliable IoT data collection at the edge.