node-red-syslog 1.1.2
Node-RED node to read a Redis stream with conditional acknowledgments.
redis-syslog-consumer
⚙️ Prerequisites
- Node-RED v2.0 or higher
- Node.js v14 or higher
- Redis v5.0 or higher (Streams support required)
- A reachable Redis server from the Node-RED runtime environment
Node-RED node and config module for consuming Redis streams reliably using consumer groups, with back‑pressure, retry, and dynamic concurrency control based on system load.
📦 Installation
Add the package to your Node-RED project (in the root folder):
npm install redis-syslog-consumer
Restart Node-RED to load the new nodes.
🚀 Usage
Once installed, you’ll have two new nodes in the palette under Syslog and config:
- Redis Syslog Consumer (
redis-syslog-consumer
) - Resource Manager (
redis-syslog-resource-manager
)
Redis Syslog Consumer Node
- Inputs: 1
- Outputs: 2 (output, logs)
Node Properties
Property | Type | Description |
---|---|---|
Name | String | Optional label |
Redis URL | String | Connection URI (redis://host:port ) |
Stream Key | String | Redis stream name (e.g. syslog ) |
Resource Manager | Config | Select your redis-syslog-resource-manager node |
Behavior
- On deploy, it creates a
StreamProcessor
with:- A Redis client connection
- A consumer‐group named
grp:<stream>
- A random consumer name
name:<id>
- Initialization:
- Creates or reuses the Redis consumer group
- Drains existing backlog (XREADGROUP from
0
) - Claims stale pending entries (older than 5 minutes)
- Starts a continuous poll (BLOCK 30s) for new messages
- Message flow:
- Each entry becomes a Node-RED message payload (
msg.payload
) with metadata (msg._redisId
). - The developer must re-inject an object
{ _redisId, _redisAck: true }
into the node to ACK/DEL from the stream.
- Each entry becomes a Node-RED message payload (
- Logging: any lifecycle events, errors, or debug can be emitted to the second output.
Resource Manager Config Node
Global config node that monitors CPU and RAM and adjusts all active Redis Syslog Consumer nodes’ concurrency based on hysteresis thresholds.
Config Properties
Property | Default | Description |
---|---|---|
Max Concurrency | 10 |
Upper bound for message‐parallelism |
Min Concurrency | 1 |
Lower bound for parallelism |
Sampling Size | 10 |
Number of samples in sliding window |
Sampling Interval (ms) | 5000 |
Time between CPU/RAM checks |
Upper CPU Threshold (%) | 80 |
CPU% above which concurrency is halved |
Lower CPU Threshold (%) | 30 |
CPU% below which concurrency is doubled |
Upper RAM Threshold (%) | 80 |
RAM% above which concurrency is halved |
Lower RAM Threshold (%) | 30 |
RAM% below which concurrency is doubled |
Behavior
- Uses
pidusage
to sample process CPU and memory. - Maintains sliding‐window averages with weighted samples.
- Applies hysteresis:
- Decrease concurrency if CPU ≥ upper CPU threshold or RAM ≥ upper RAM threshold.
- Increase concurrency if both CPU < lower CPU threshold and RAM < lower RAM threshold.
- Doubles or halves the global concurrency, clamped between min/max.
📖 API Reference
StreamProcessor
const { StreamProcessor } = require("redis-syslog-consumer");
Constructor
new StreamProcessor(
url: string,
stream: string,
concurrency: number,
onMessageCallback: (msg: { payload: string; _redisId: string }) => void,
logger: (message: string, context?: string) => void
)
url
: Redis connection URIstream
: Redis stream keyconcurrency
: max parallel messagesonMessageCallback
: invoked with each parsed messagelogger
: Node-RED logger function
Methods
initialize(): Promise<void>
– start consuming (create group, drain backlog, poll).acknowledgeAndDelete(messageId: string): Promise<void>
– XACK + XDEL for a given message.setConcurrency(newConcurrency: number)
– dynamically adjust concurrency.
GlobalResource (under the hood)
const { GlobalResource } = require("redis-syslog-consumer/configuration");
- Used internally by the config node; not typically instantiated by users.
🛠 Example Flow
[
{
"id": "1",
"type": "redis-syslog-resource-manager",
"name": "ResourceManager",
"concurrencyMax": 20,
"concurrencyMin": 1,
"samplingCount": 15,
"samplingInterval": 3000,
"thresholdUpper": 75,
"thresholdLower": 25,
"thresholdUpperMem": 75,
"thresholdLowerMem": 25
},
{
"id": "2",
"type": "redis-syslog-consumer",
"stream": "syslog",
"url": "redis://127.0.0.1:6379",
"resource": "1",
"wires": [["3"], ["4"]]
}
]
Inject back to acknowledge:
{ "_redisId": "1680000000000-0", "_redisAck": true }
Contributing
Contributions are welcome! Please fork the repository, create a feature branch, and submit a pull request with a detailed explanation of changes.
Versioning
We use SemVer for versioning. For the versions available, see the tags on this repository.
Authors
- Paolo Fabris - Initial work - ubyte.it
See also the list of contributors who participated in this project.
⚖️ License
This project is licensed under the MIT License. See the LICENSE file for details.