node-red-contrib-saxe-edge 2.0.2

Saxe Edge - MQTT batch processing with offline buffering for Node-RED

npm install node-red-contrib-saxe-edge

node-red-contrib-saxe-edge

Saxe Edge - MQTT batch processing with offline buffering for Node-RED.

This plugin provides a configuration node and a buffer node for implementing the Saxe Edge ST1 protocol: buffered sensor data collection, GPS aggregation, batch processing, and reliable MQTT uplink with offline resilience.

Features

  • Offline Buffering: SQLite-based outbox for resilient data storage during network outages
  • Batch Processing: Automatic batching with 3-limit enforcement (records, bytes, time span)
  • Configurable Batch Interval: Adjustable batch send interval (1-300 seconds, default 5)
  • Cross-Metric Batching: All metrics for a device are batched together for efficient compression
  • GPS Aggregation: Combines separate latitude/longitude readings into atomic ST1 GPS format
  • Poison Record Handling: Isolates invalid records to dead_letter table
  • Sequence Tracking: Monotonic sequence numbers per device for cloud-side deduplication
  • MQTT QoS 1: Reliable delivery with PUBACK confirmation before record deletion
  • Edge Presence: Automatic online/offline status via MQTT birth/LWT messages

Installation

Via Node-RED Palette Manager (Recommended)

  1. Open Node-RED editor
  2. Click the menu (☰) → Manage palette
  3. Go to the "Install" tab
  4. Search for node-red-contrib-saxe-edge
  5. Click "Install"

Via npm

cd ~/.node-red
npm install node-red-contrib-saxe-edge

Restart Node-RED after installation.

Quick Start

1. Create Configuration Node

  1. Drag a saxe-edge-buffer node onto your flow
  2. Double-click it and click the pencil icon to add a new saxe-edge-config
  3. Configure:
    • Organization: Your org identifier (e.g., acme_corp)
    • Site: Your site identifier (e.g., factory_oslo)
    • MQTT Broker: Broker URL (e.g., wss://mqtt.example.com:443 or mqtt://localhost:1883)
    • MQTT User/Password: Credentials for broker authentication
    • Batch Size: Max records per batch cycle (default: 100)
    • Batch Interval: Seconds between batch sends (default: 5, range: 1-300)
    • Database Path: SQLite database location (default: /edge-data/outbox.db)
  4. Click "Add" or "Update"

The config node will automatically:

  • Generate a unique Edge UUID (persisted to edge-uuid.txt)
  • Set global context variables for use in flows
  • Publish online/offline presence to ST1/edge/{org}/{site}/{edge_uuid}/online

2. Connect Your Sensors

Send sensor data to the buffer node with the following message format:

msg.payload = {
    value: 23.5,
    unit: "°C"
};
msg.subsystem = "line1/zoneA"; // Optional path (e.g., "area/cell")
msg.device = "PLC-01";         // Device identifier
msg.metric = "temperature";    // Metric name

Device status example:

msg.payload = {
    status: "online",
    ts: new Date().toISOString()
};
msg.domain = "devices";
msg.device = "PLC-01";
msg.metric = "status";

GPS example (send latitude + longitude as separate messages):

msg.payload = { value: 59.9139, unit: "deg" };
msg.device = "gps";
msg.metric = "latitude"; // or "longitude"

The buffer node will:

  1. Transform to ST1 format
  2. Buffer in SQLite outbox
  3. Batch on a configurable interval (default: every 5 seconds)
  4. Group all metrics per device into a single batch message
  5. Publish to MQTT with QoS 1
  6. Delete records after PUBACK confirmation

Configuration

Config Node Properties

Property Type Required Default Description
Organization string Yes - Organization identifier for MQTT topic
Site string Yes - Site identifier for MQTT topic
MQTT Broker string Yes - Broker URL (mqtt://, mqtts://, ws://, wss://)
MQTT User string No - Optional username for broker authentication
MQTT Password string No - Optional password for broker authentication
Batch Size number No 100 Max records to claim per batch cycle
Batch Interval number No 5 Seconds between batch sends (range: 1-300)
Database Path string Yes /edge-data/outbox.db Path to SQLite database file

Global Context Variables

The config node sets these global context variables for use in flows:

  • EDGE_UUID: Unique edge device identifier
  • ORG: Organization identifier
  • SITE: Site identifier
  • MQTT_BROKER: MQTT broker URL
  • MQTT_USE_TLS: Boolean, true for mqtts:// or wss://
  • BATCH_SIZE: Max records per batch
  • BATCH_INTERVAL: Seconds between batch sends
  • DB_PATH: Database file path

Access in function nodes:

const edgeUuid = global.get('EDGE_UUID');
const org = global.get('ORG');
const site = global.get('SITE');
const batchInterval = global.get('BATCH_INTERVAL');

Database Schema

The plugin creates three tables:

outbox - Buffered Records

Column Type Description
id INTEGER PK Auto-increment ID
topic TEXT MQTT topic
payload TEXT JSON payload
created_at TEXT ISO 8601 timestamp
sent INTEGER 0=pending, 1=sent, 2=inflight

Indexes: (sent, id), (created_at)

seq_state - Sequence Tracking

Column Type Description
group_key TEXT PK Unique per {org}/{site}/{subsystem}/{device}
seq INTEGER Last sequence number used

dead_letter - Poison Records

Column Type Description
id INTEGER PK Auto-increment ID
original_id INTEGER Original outbox record ID
topic TEXT Original MQTT topic
payload TEXT Original JSON payload
error TEXT Error description
created_at TEXT Original creation timestamp
moved_at TEXT Timestamp when moved to dead_letter

MQTT Topic Structure

Sensor Data Batch

ST1/batch/sensors/{org}/{site}/{subsystem_path?}/{device}

Example: ST1/batch/sensors/acme_corp/factory_oslo/line1/zoneA/PLC-01

Device Status Batch

ST1/batch/devices/{org}/{site}/{subsystem_path?}/{device}/status

Example: ST1/batch/devices/acme_corp/factory_oslo/line1/zoneA/PLC-01/status

GPS Batch

ST1/batch/sensors/{org}/{site}/gps

Example: ST1/batch/sensors/acme_corp/factory_oslo/gps

Edge Presence

ST1/edge/{org}/{site}/{edge_uuid}/online

Presence payload includes {online, ts, edge_uuid, org, site}.

Batch Payload Format (ST1 v7)

Sensor Data

A sensor batch groups all metrics for a single device. The records array can contain different metrics (e.g., temperature, pressure, vibration) in the same batch.

{
  "v": 1,
  "org": "acme_corp",
  "site": "factory_oslo",
  "subsystem": "line1/zoneA",
  "device": "PLC-01",
  "edge_uuid": "87a70b51-b370-427c-bc7e-06ee62dcea61",
  "from": "2026-01-08T10:00:00Z",
  "to": "2026-01-08T10:00:05Z",
  "seq": 42,
  "records": [
    {
      "ts": "2026-01-08T10:00:01Z",
      "metric": "temperature",
      "value": 23.5,
      "unit": "°C"
    },
    {
      "ts": "2026-01-08T10:00:01Z",
      "metric": "pressure",
      "value": 1.013,
      "unit": "bar"
    },
    {
      "ts": "2026-01-08T10:00:03Z",
      "metric": "temperature",
      "value": 23.6,
      "unit": "°C"
    }
  ]
}

Note: edge_uuid is automatically included in all batch payloads. This enables the cloud to track which edge device sent the data, supporting scenarios with multiple edge devices per site.

GPS Data

{
  "v": 1,
  "org": "acme_corp",
  "site": "factory_oslo",
  "edge_uuid": "87a70b51-b370-427c-bc7e-06ee62dcea61",
  "seq": 12,
  "records": [
    {
      "ts": "2026-01-08T10:00:00Z",
      "lat": 59.9139,
      "lon": 10.7522
    }
  ]
}

Batch Processing Rules

Batches are closed when ANY of these limits is reached:

  1. Max Records: 5000 records per batch
  2. Max Bytes: 1 MB payload size
  3. Max Time Span: 5 seconds between first and last record

Batch interval is configurable (default: 5 seconds, range: 1-300 seconds). Higher intervals collect more records per batch for better compression but increase latency.

Grouping Rules

Data Type Grouping Key Result
Sensor {domain, org, site, subsystem, device} All metrics for one device in one batch
Device status {domain, org, site, subsystem, device} Status records per device
GPS {domain, org, site} All GPS readings for the site

Troubleshooting

Database Permission Errors

If you see "SQLITE_READONLY" errors:

# Ensure directory exists and is writable
mkdir -p /edge-data
chmod 755 /edge-data

# If database file exists with wrong permissions
sudo chown $(whoami):$(whoami) /edge-data/outbox.db
chmod 644 /edge-data/outbox.db

MQTT Connection Failures

  1. Check broker URL format: Must include protocol (mqtt://, mqtts://, ws://, wss://)
  2. Verify credentials: Ensure username/password are correct
  3. Test connectivity: Use mosquitto_pub to verify broker is reachable
  4. Check TLS: For mqtts:// or wss://, ensure certificates are valid

Batches Not Sending

  1. Check outbox: Query database to see if records are accumulating
    sqlite3 /edge-data/outbox.db "SELECT COUNT(*) FROM outbox WHERE sent = 0;"
    
  2. Check debug output: Add a debug node after sensors to verify messages arrive
  3. Verify MQTT connection: Check if config node shows "connected" in Node-RED log
  4. Check batch interval: The internal timer runs at the configured batch interval (default: 5 seconds)

Docker Deployment

For containerized deployments:

services:
  nodered:
    image: nodered/node-red:latest
    volumes:
      - nodered-data:/data
      - ./edge-data:/edge-data
    environment:
      - ORG=acme_corp
      - SITE=factory_oslo
      - MQTT_BROKER=wss://mqtt.example.com:443
      - MQTT_USER=edge_user
      - MQTT_PASSWORD=secret
      - BATCH_SIZE=100
      - BATCH_INTERVAL=10

All configuration properties support Node-RED's ${ENV_VAR} syntax for environment variable substitution.

Architecture

Data flow through the buffer node:

  1. Sensors (Modbus, GPS, etc.) send messages to the buffer node
  2. Transform -- incoming data is converted to ST1 format; GPS lat/lon are aggregated into atomic {ts, lat, lon}
  3. SQLite Outbox -- records are appended to the outbox table for offline resilience
  4. Batch Cycle -- a configurable timer (default: 5s) triggers batch processing
  5. Claim Records -- atomic UPDATE...RETURNING claims pending records
  6. Build Batches -- records are grouped by device; all metrics for one device go into one batch; 3-limit enforcement (records, bytes, time span)
  7. Poison Handling -- invalid records are moved to dead_letter table
  8. Sequence Assignment -- monotonic seq number per device via UPSERT
  9. Format ST1 -- final JSON batch payload is built
  10. MQTT Publish -- batch is published with QoS 1
  11. PUBACK -- broker confirms receipt
  12. Delete Records -- confirmed records are removed from outbox

License

Saxe Noncommercial No-Derivatives License. See LICENSE. Commercial use is permitted only for Saxe and Saxe Customers as defined in the license. Noncommercial users may use and distribute unmodified copies only.

Support

Changelog

See CHANGELOG.md for version history.


Saxe Edge - Reliable IoT data collection at the edge.

Node Info

Version: 2.0.2
Updated 2 weeks ago
License: SEE LICENSE IN LICENSE
Rating: not yet rated

Categories

Actions

Rate:

Downloads

5 in the last week

Nodes

  • saxe-edge-config
  • saxe-edge-buffer

Keywords

  • node-red
  • saxe
  • edge
  • mqtt
  • batch
  • sqlite
  • offline
  • buffer
  • iot
  • node-red-contrib

Maintainers