@freol35241/nodered-contrib-zenoh 0.3.0
Node-RED nodes for Eclipse Zenoh - a pub/sub, query/queryable protocol that unifies data in motion, data at rest, and computational tasks
nodered-contrib-zenoh
Node-RED nodes for Eclipse Zenoh integration, providing seamless pub/sub, query/queryable, and liveliness functionality.
https://flows.nodered.org/node/@freol35241/nodered-contrib-zenoh
Overview
This package provides Node-RED nodes to interact with Eclipse Zenoh, a "Zero Overhead Pub/sub, Store/Query, and Compute" protocol that unifies data in motion, data at rest, and computational tasks. Includes support for liveliness tokens to monitor and discover active entities on the network.
Installation
npm install @freol35241/nodered-contrib-zenoh
Requirements
- Node.js: Version 16.x or higher
- WASM Support: The zenoh-ts library uses WebAssembly modules for key expression parsing. Node.js must be configured to load WASM files.
- WebSocket: Automatically provided via the
wspackage for Node.js compatibility.
Enabling WASM Support
Node-RED must be started with WASM module support enabled. Choose one of these methods:
Method 1: Environment Variable (Recommended)
Set the NODE_OPTIONS environment variable before starting Node-RED:
export NODE_OPTIONS="--experimental-wasm-modules --no-warnings"
node-red
To make this permanent, add it to your shell profile (~/.bashrc, ~/.zshrc, etc.):
echo 'export NODE_OPTIONS="--experimental-wasm-modules --no-warnings"' >> ~/.bashrc
source ~/.bashrc
Method 2: Direct Node.js Flags
Start Node-RED directly with the required flags:
node --experimental-wasm-modules --no-warnings $(which node-red)
Or if you have Node-RED installed locally:
node --experimental-wasm-modules --no-warnings node_modules/node-red/red.js
Method 3: Node-RED Settings File
Add the following to your Node-RED settings.js file (usually located at ~/.node-red/settings.js):
// At the top of the file, before module.exports
if (!process.execArgv.includes('--experimental-wasm-modules')) {
console.log('Note: WASM support not enabled. Some Zenoh features may not work.');
console.log('Consider setting NODE_OPTIONS="--experimental-wasm-modules --no-warnings"');
}
Docker/Container Environments
If running Node-RED in Docker, set the environment variable in your docker-compose.yml:
services:
node-red:
image: nodered/node-red:latest
environment:
- NODE_OPTIONS=--experimental-wasm-modules --no-warnings
Or pass it with docker run:
docker run -e NODE_OPTIONS="--experimental-wasm-modules --no-warnings" -p 1880:1880 nodered/node-red
Note: If you see an error message like Unknown file extension ".wasm", it means WASM support is not enabled. Follow one of the methods above to enable it.
Prerequisites
You need a running Zenoh router with the zenoh-plugin-remote-api WebSocket plugin enabled:
# Install zenohd
cargo install zenohd
# Run with WebSocket support
zenohd --ws-port 10000
Or use Docker:
docker run -p 10000:10000 eclipse/zenoh --ws-port 10000
Docker Compose Setup
For a complete setup with both Node-RED and Zenoh running together, use this Docker Compose configuration.
IMPORTANT: The eclipse/zenoh Docker image does NOT include the remote-api plugin by default. The plugin must be downloaded separately and mounted into the container. This is the same approach used in our integration tests.
version: '3.8'
services:
# Zenoh router with remote-api WebSocket plugin
zenoh-router:
image: eclipse/zenoh:1.6.2
container_name: zenoh-router
ports:
- "7447:7447" # Zenoh peer/router communication
- "8000:8000" # REST API
- "10000:10000" # WebSocket (remote-api plugin)
volumes:
# CRITICAL: Mount the remote-api plugin
# The plugin must be downloaded separately (see setup instructions below)
- ./zenoh_plugins:/root/.zenoh
command: >
--cfg='mode:"router"'
--cfg='listen:["tcp/0.0.0.0:7447"]'
--cfg='plugins/rest/http_port:"0.0.0.0:8000"'
--cfg='plugins/remote_api/websocket_port:"0.0.0.0:10000"'
restart: unless-stopped
networks:
- zenoh-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/@/router/local"]
interval: 10s
timeout: 5s
retries: 5
# Node-RED with Zenoh nodes
node-red:
image: nodered/node-red:latest
container_name: node-red
ports:
- "1880:1880" # Node-RED UI
environment:
# CRITICAL: Enable WASM module support for zenoh-ts
- NODE_OPTIONS=--experimental-wasm-modules --no-warnings
# Optional: Set timezone
- TZ=UTC
volumes:
# Persist Node-RED data
- node-red-data:/data
depends_on:
zenoh-router:
condition: service_healthy
restart: unless-stopped
networks:
- zenoh-network
networks:
zenoh-network:
driver: bridge
volumes:
node-red-data:
Setup Instructions:
Download the docker-compose.yml file:
# Create a directory for your deployment mkdir zenoh-nodered && cd zenoh-nodered # Download docker-compose.yml from the examples folder curl -O https://raw.githubusercontent.com/freol35241/nodered-contrib-zenoh/main/examples/docker-compose.ymlCRITICAL: Download the Zenoh remote-api plugin for your platform:
The
zenoh-plugin-remote-apiis required for WebSocket connectivity but is NOT included in the Docker image by default.Steps:
a. Browse to the Eclipse Zenoh plugin repository: https://download.eclipse.org/zenoh/zenoh-plugin-remote-api/
b. Navigate to version
1.6.2/(must match the Zenoh router version)c. Download the appropriate standalone build for your host architecture:
The plugin must match the container OS (Linux) and your host CPU architecture.
Check your architecture with
uname -m(returnsx86_64oraarch64/arm64):- x86_64 hosts (Intel/AMD Linux, Intel Mac):
zenoh-ts-1.6.2-x86_64-unknown-linux-musl-standalone.zip - ARM64 hosts (ARM Linux, Raspberry Pi, Apple Silicon Mac):
zenoh-ts-1.6.2-aarch64-unknown-linux-musl-standalone.zip
d. Extract the plugin to the correct location:
mkdir -p zenoh_plugins/lib cd zenoh_plugins/lib # Extract your downloaded plugin zip file here unzip ~/Downloads/zenoh-ts-1.6.2-*-standalone.zip # Verify the plugin library file exists ls -la # You should see: libzenoh_plugin_remote_api.so cd ../..Important: Since the eclipse/zenoh container is Linux, always use the
*-linux-musl-standalone.zipplugin. Choose x86_64 or aarch64 based on your host's CPU architecture.- x86_64 hosts (Intel/AMD Linux, Intel Mac):
Start the services:
docker-compose up -dInstall the Zenoh nodes in Node-RED:
- Open Node-RED at http://localhost:1880
- Go to Menu (☰) → Manage palette → Install
- Search for
@freol35241/nodered-contrib-zenoh - Click Install
Configure the Zenoh Session node:
- Drag a Zenoh node into your flow
- Configure the session with locator:
ws://zenoh-router:10000 - Note: Use the service name
zenoh-routerinstead oflocalhostfor inter-container communication
Import example flows:
- Go to Menu (☰) → Import → Examples
- Navigate to
@freol35241/nodered-contrib-zenoh - Select a flow to try
Verify the setup:
# Check that both services are running
docker-compose ps
# Test Zenoh REST API
curl http://localhost:8000/@/router/local
# View logs
docker-compose logs zenoh-router
docker-compose logs node-red
Connecting external clients:
From your host machine or other containers, you can connect to:
- Zenoh WebSocket:
ws://localhost:10000 - Zenoh TCP:
tcp://localhost:7447 - Zenoh REST API:
http://localhost:8000 - Node-RED UI:
http://localhost:1880
Troubleshooting:
If you encounter WASM-related errors in Node-RED:
- Check the Node-RED logs:
docker-compose logs node-red - Verify
NODE_OPTIONSis set:docker-compose exec node-red env | grep NODE_OPTIONS - Restart Node-RED if needed:
docker-compose restart node-red
If Zenoh connection fails:
- Verify the plugin was downloaded: Check that
zenoh_plugins/lib/contains the plugin filesls -la zenoh_plugins/lib/ # You should see libzenoh_plugin_remote_api.so or similar - Check Zenoh router logs for plugin loading:
docker-compose logs zenoh-router | grep -i plugin docker-compose logs zenoh-router | grep -i remote_api - Verify the router is healthy:
docker-compose ps - Check WebSocket port is accessible:
docker-compose exec node-red nc -zv zenoh-router 10000 - Test the REST API:
curl http://localhost:8000/@/router/local
If the plugin isn't loading, ensure:
- The
zenoh_pluginsdirectory exists and contains the extracted plugin in thelib/subdirectory - The volume mount path matches:
./zenoh_plugins:/root/.zenoh - You downloaded the correct architecture (x86_64-unknown-linux-musl) for Docker
Nodes
Zenoh Session (Configuration Node)
Configuration node that manages the Zenoh session connection.
Configuration:
- Locator: WebSocket URL to the Zenoh router (e.g.,
ws://localhost:10000)
Zenoh Subscribe
Subscribes to a Zenoh key expression and outputs received samples.
Configuration:
- Session: The Zenoh session configuration
- Key Expression: The key expression to subscribe to (supports wildcards)
- Name: Optional node name
Outputs:
{
payload: <deserialized-data>, // The payload data
topic: "demo/example/key", // The key expression
zenoh: {
keyExpr: "demo/example/key",
encoding: "application/json",
kind: 0, // Sample kind
timestamp: {...}, // Optional timestamp
priority: 5,
congestionControl: 0,
express: false,
attachment: <optional-data> // Optional attachment
}
}
Key Expression Wildcards:
*- matches a single chunk (e.g.,demo/*/keymatchesdemo/example/key)**- matches multiple chunks (e.g.,demo/**matchesdemo/a/b/c)
Zenoh Put
Publishes data to a Zenoh key expression.
Configuration:
- Session: The Zenoh session configuration
- Key Expression: Default key expression (can be overridden by message)
- Force Key Expression: When checked, always use the configured key expression and ignore
msg.keyExprandmsg.topic - Encoding: Default encoding type for payloads
- Priority: Default message priority (1-7)
- Congestion Control: How to handle network congestion (Drop or Block)
- Reliability: Best Effort or Reliable delivery
- Express: Enable express mode for lower latency
- Allowed Destination: Restrict message delivery scope
- Name: Optional node name
Key Expression Behavior:
The node determines which key expression to use with the following logic:
Default (Force Key Expression unchecked):
- Use
msg.keyExprif present - Otherwise, use
msg.topicif present - Otherwise, use the configured Key Expression
- Use
Forced (Force Key Expression checked):
- Always use the configured Key Expression
- Ignore
msg.keyExprandmsg.topiceven if present - Useful for ensuring a fixed key expression regardless of message content
Inputs:
{
payload: <any-data>, // Required: data to publish
keyExpr: "demo/example/key", // Optional: overrides configured key (unless forced)
topic: "demo/example/key", // Alternative to keyExpr (unless forced)
encoding: "application/json", // Optional
priority: 5, // Optional
congestionControl: 0, // Optional
express: false, // Optional
reliability: 0, // Optional
attachment: <extra-data> // Optional
}
Zenoh Query
Issues queries to Zenoh queryables and collects replies.
Configuration:
- Session: The Zenoh session configuration
- Selector: Default selector (key expression + parameters)
- Timeout: Query timeout in milliseconds (default: 10000)
- Name: Optional node name
Inputs:
{
selector: "demo/example/**?arg=val", // Optional: overrides configured selector
topic: "demo/example/**", // Alternative to selector
payload: <query-data>, // Optional: payload for the query
encoding: "application/json", // Optional
timeout: 5000, // Optional: timeout in ms
target: 0, // Optional: query target
consolidation: 0, // Optional
attachment: <extra-data> // Optional
}
Outputs:
{
payload: [ // Array of replies
{
payload: <reply-data>,
topic: "demo/example/key",
zenoh: {
keyExpr: "demo/example/key",
encoding: "application/json",
kind: 0,
timestamp: {...},
type: "sample" // or "error"
}
},
...
]
}
Zenoh Queryable
Responds to Zenoh queries on a key expression.
Configuration:
- Session: The Zenoh session configuration
- Key Expression: The key expression to handle queries for
- Name: Optional node name
Outputs (when query received):
{
payload: <query-payload>, // Query payload (if any)
topic: "demo/example/key", // Query key expression
queryId: "abc123", // Unique query ID (required for replies)
zenoh: {
keyExpr: "demo/example/key",
parameters: "arg1=val1;arg2=val2", // Query parameters
selector: "demo/**?arg=val", // Full selector
encoding: "application/json", // Optional
attachment: <extra-data> // Optional
}
}
Inputs (to send replies):
// Send a normal reply
{
queryId: "abc123", // Required: from query output
payload: <reply-data>, // Required: reply data
keyExpr: "demo/example/key", // Required: key for reply
topic: "demo/example/key", // Alternative to keyExpr
encoding: "application/json", // Optional
attachment: <extra-data> // Optional
}
// Send an error reply
{
queryId: "abc123", // Required
error: true, // Marks as error reply
payload: "Error message", // Error description
encoding: "text/plain" // Optional
}
// Finalize query (no more replies)
{
queryId: "abc123", // Required
finalize: true // Signals completion
}
Zenoh Liveliness Token
Declares a liveliness token to indicate presence on the Zenoh network.
Configuration:
- Session: The Zenoh session configuration
- Key Expression: The key expression for the token (e.g.,
robot/status/robot1) - Auto-start: When enabled (default), the token is automatically declared on node startup
- Name: Optional node name
Inputs (for manual control):
{
action: "declare" // Declare the token
}
{
action: "undeclare" // Remove the token
}
Outputs:
{
payload: "declared", // or "undeclared"
topic: "robot/status/robot1", // The token's key expression
zenoh: {
keyExpr: "robot/status/robot1",
type: "liveliness-token"
}
}
Details:
- Liveliness tokens are used to advertise presence on the network
- Other nodes can query for or subscribe to these tokens to discover active entities
- Tokens are automatically undeclared when the node is stopped or redeployed
- With auto-start enabled, the token is declared immediately on flow deployment
- With auto-start disabled, send
msg.action = "declare"to manually declare the token
Zenoh Liveliness Subscribe
Subscribes to liveliness token changes and outputs events when tokens appear or disappear.
Configuration:
- Session: The Zenoh session configuration
- Key Expression: The key expression pattern to monitor (supports wildcards)
- History: When enabled (default), receive PUT events for tokens that are already alive
- Name: Optional node name
Outputs:
{
payload: {
alive: true, // true for PUT, false for DELETE
keyExpr: "robot/status/robot1" // The token that changed
},
topic: "robot/status/robot1", // The token's key expression
zenoh: {
keyExpr: "robot/status/robot1",
kind: 0, // 0 = PUT (alive), 1 = DELETE (gone)
timestamp: {...}, // Optional timestamp
type: "liveliness-change"
}
}
Key Expression Wildcards:
*- matches a single chunk (e.g.,robot/*/statusmatchesrobot/robot1/status)**- matches multiple chunks (e.g.,robot/**matches all tokens underrobot/)
Sample Kinds:
- PUT (kind = 0): A token was declared (entity came online)
- DELETE (kind = 1): A token was undeclared (entity went offline)
History Behavior:
- With
history: true(default): Immediately receive PUT events for all currently alive tokens matching the pattern, then receive updates as tokens appear/disappear - With
history: false: Only receive events for changes that occur after the subscription starts
Zenoh Liveliness Get
Queries for currently alive liveliness tokens (one-time snapshot).
Configuration:
- Session: The Zenoh session configuration
- Key Expression: Default key expression pattern to query
- Timeout: Query timeout in milliseconds (default: 10000)
- Name: Optional node name
Inputs:
{
keyExpr: "robot/**", // Optional: overrides configured pattern
topic: "robot/**", // Alternative to keyExpr
timeout: 5000 // Optional: timeout in ms
}
Outputs:
{
payload: [ // Array of alive tokens
{
keyExpr: "robot/status/robot1",
timestamp: {...} // Optional
},
{
keyExpr: "robot/status/robot2",
timestamp: {...}
}
],
count: 2 // Number of tokens found
}
Use Cases:
- Discover which services or devices are currently online
- Health checks to verify active components
- Initial state discovery before subscribing to changes
- Periodic polling for presence information
Difference from Subscribe:
- Get: One-time snapshot of current state when triggered
- Subscribe: Continuous monitoring with events for each change
Payload Handling
All Zenoh nodes use raw bytes (Buffer) for payload transport. This provides a predictable, transparent approach that aligns with Zenoh's fundamentally binary protocol.
How It Works
Sending (zenoh-put):
- All payload types are automatically converted to Buffer (raw bytes) before transmission
- The conversion preserves data integrity while ensuring compatibility with Zenoh's binary transport
Receiving (zenoh-subscribe, zenoh-query, zenoh-queryable):
- Payloads are always received as Node.js Buffer objects
- The Node-RED debug window displays Buffers showing both hex and ASCII interpretation
- Use standard Node-RED nodes to convert Buffers to your desired format
Common Scenarios
Sending and Receiving Strings
Flow:
[inject: "hello"] → [zenoh-put] → Zenoh → [zenoh-subscribe] → [debug]
On the receiving side:
- Debug shows:
Buffer <68 65 6c 6c 6f>(with ASCII preview: "hello") - To use as string, add a function node:
msg.payload = msg.payload.toString('utf8');
return msg;
Or use a change node: Set msg.payload to expression: $string(msg.payload)
Sending and Receiving JSON
Flow:
[inject: {temp: 20, unit: "C"}] → [zenoh-put] → Zenoh → [zenoh-subscribe] → [function] → [debug]
zenoh-put automatically:
- Converts objects to JSON string:
{"temp":20,"unit":"C"} - Encodes as UTF-8 bytes
On the receiving side, use a function node:
// Convert Buffer to string, then parse JSON
msg.payload = JSON.parse(msg.payload.toString('utf8'));
// Now msg.payload is {temp: 20, unit: "C"}
return msg;
Or chain nodes:
[zenoh-subscribe] → [buffer-to-string] → [JSON parse] → [debug]
Sending and Receiving Numbers
Sending:
[inject: 42] → [zenoh-put]
- Number is converted to string "42", then to UTF-8 bytes
Receiving:
// In a function node after zenoh-subscribe
msg.payload = Number(msg.payload.toString('utf8'));
return msg;
Or use a change node: Set msg.payload to expression: $number($string(msg.payload))
Sending and Receiving Binary Data
Sending binary data directly:
// In a function node before zenoh-put
msg.payload = Buffer.from([0x01, 0x02, 0x03, 0x04]);
return msg;
Receiving:
- The Buffer is already in binary form, use directly or process bytes
Encoding sensor data example:
// Sending: Pack temperature (float) and humidity (int)
const buffer = Buffer.allocUnsafe(8);
buffer.writeFloatLE(23.5, 0); // Temperature at offset 0
buffer.writeInt32LE(65, 4); // Humidity at offset 4
msg.payload = buffer;
return msg;
// Receiving: Unpack the data
const temp = msg.payload.readFloatLE(0);
const humidity = msg.payload.readInt32LE(4);
msg.payload = { temperature: temp, humidity: humidity };
return msg;
Working with Typed Arrays
Sending:
// In a function node
const data = new Uint8Array([10, 20, 30, 40, 50]);
msg.payload = Buffer.from(data);
return msg;
Receiving:
// Convert Buffer back to Uint8Array if needed
const typedArray = new Uint8Array(msg.payload);
msg.payload = Array.from(typedArray); // [10, 20, 30, 40, 50]
return msg;
Best Practices
Use encoding hints: Set
msg.encodingwhen sending to help receivers:msg.encoding = "application/json"; // For JSON data msg.encoding = "text/plain"; // For plain text msg.encoding = "application/octet-stream"; // For binary dataCreate reusable conversion flows: Build subflows for common conversions:
- Buffer → JSON Object
- Buffer → String
- Number → Buffer
- JSON Object → Buffer
Check the debug window: When troubleshooting, the debug window shows both:
- Hex representation:
<48 65 6c 6c 6f> - ASCII interpretation:
"Hello"(when printable)
- Hex representation:
Handle errors gracefully: Always wrap JSON parsing in try-catch:
try { msg.payload = JSON.parse(msg.payload.toString('utf8')); } catch (e) { node.error("Invalid JSON: " + e.message); return null; } return msg;
Why Raw Bytes?
This approach provides several advantages:
- Transparency: No hidden conversions or magic—you always know you're working with bytes
- Flexibility: Use Node-RED's rich ecosystem of conversion nodes
- Compatibility: Works naturally with Zenoh's binary protocol
- Performance: No unnecessary serialization/deserialization overhead
- Debuggability: Node-RED's debug window shows Buffer contents clearly
Usage Examples
Simple Pub/Sub
[inject] --> [zenoh-put]
(key: demo/example)
[zenoh-subscribe] --> [debug]
(key: demo/example)
Query/Queryable
[inject] --> [zenoh-query] --> [debug]
(selector: demo/data/**)
[zenoh-queryable] --> [function] --> [zenoh-queryable]
(key: demo/data/**) (prepare (loop back to send
reply) replies)
Example function node for queryable:
// Prepare reply
msg.keyExpr = msg.topic;
msg.payload = { response: "data", timestamp: Date.now() };
return msg;
To finalize after sending reply:
// Send reply
var reply = {
queryId: msg.queryId,
keyExpr: msg.topic,
payload: { data: "value" }
};
// Send finalize
var finalize = {
queryId: msg.queryId,
finalize: true
};
return [[reply], [finalize]];
Liveliness - Presence Detection
Scenario: Monitor which robots are online in a fleet.
[zenoh-liveliness-token]
(key: fleet/robot/robot1, auto-start: true)
[zenoh-liveliness-token]
(key: fleet/robot/robot2, auto-start: true)
[zenoh-liveliness-subscribe] --> [function] --> [debug]
(key: fleet/robot/**) (parse event)
Function node to parse liveliness events:
// Extract useful information
const event = {
robot: msg.payload.keyExpr.split('/').pop(), // Extract robot ID
status: msg.payload.alive ? 'online' : 'offline',
timestamp: new Date().toISOString()
};
msg.payload = event;
return msg;
One-time discovery: Query for currently online robots:
[inject] --> [zenoh-liveliness-get] --> [function] --> [debug]
(key: fleet/robot/**) (format list)
Function node to format the results:
// Convert array of tokens to robot list
const onlineRobots = msg.payload.map(token => {
return token.keyExpr.split('/').pop(); // Extract robot ID
});
msg.payload = {
count: msg.count,
robots: onlineRobots
};
return msg;
// Output: { count: 2, robots: ['robot1', 'robot2'] }
Manual token control:
[inject] --> [zenoh-liveliness-token]
(key: fleet/robot/robot3, auto-start: false)
// Inject: { action: "declare" } to come online
// Inject: { action: "undeclare" } to go offline
Development
Running Tests
The package includes comprehensive unit and integration tests:
npm test
Note: Integration tests require a running Zenoh router with WebSocket support on ws://localhost:10000.
Test Coverage
Tests cover:
- Basic node configuration and loading
- Put/Subscribe message flow
- Query/Queryable interactions
- Liveliness token declaration and querying
- Liveliness change monitoring (subscribe)
- Wildcard subscriptions
- Parameter passing
- Multiple replies
- Error handling
API Reference
This package uses zenoh-ts, the TypeScript/JavaScript bindings for Eclipse Zenoh.
For detailed API documentation, see:
License
Apache License 2.0
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Support
For issues and questions: