node-red-syslog 1.1.2

Node-RED node to read a Redis stream with conditional acknowledgments.

npm install node-red-syslog

redis-syslog-consumer

⚙️ Prerequisites

  • Node-RED v2.0 or higher
  • Node.js v14 or higher
  • Redis v5.0 or higher (Streams support required)
  • A reachable Redis server from the Node-RED runtime environment

Node-RED node and config module for consuming Redis streams reliably using consumer groups, with back‑pressure, retry, and dynamic concurrency control based on system load.


📦 Installation

Add the package to your Node-RED project (in the root folder):

npm install redis-syslog-consumer

Restart Node-RED to load the new nodes.


🚀 Usage

Once installed, you’ll have two new nodes in the palette under Syslog and config:

  1. Redis Syslog Consumer (redis-syslog-consumer)
  2. Resource Manager (redis-syslog-resource-manager)

Redis Syslog Consumer Node

  • Inputs: 1
  • Outputs: 2 (output, logs)

Node Properties

Property Type Description
Name String Optional label
Redis URL String Connection URI (redis://host:port)
Stream Key String Redis stream name (e.g. syslog)
Resource Manager Config Select your redis-syslog-resource-manager node

Behavior

  1. On deploy, it creates a StreamProcessor with:
    • A Redis client connection
    • A consumer‐group named grp:<stream>
    • A random consumer name name:<id>
  2. Initialization:
    • Creates or reuses the Redis consumer group
    • Drains existing backlog (XREADGROUP from 0)
    • Claims stale pending entries (older than 5 minutes)
    • Starts a continuous poll (BLOCK 30s) for new messages
  3. Message flow:
    • Each entry becomes a Node-RED message payload (msg.payload) with metadata (msg._redisId).
    • The developer must re-inject an object { _redisId, _redisAck: true } into the node to ACK/DEL from the stream.
  4. Logging: any lifecycle events, errors, or debug can be emitted to the second output.

Resource Manager Config Node

Global config node that monitors CPU and RAM and adjusts all active Redis Syslog Consumer nodes’ concurrency based on hysteresis thresholds.

Config Properties

Property Default Description
Max Concurrency 10 Upper bound for message‐parallelism
Min Concurrency 1 Lower bound for parallelism
Sampling Size 10 Number of samples in sliding window
Sampling Interval (ms) 5000 Time between CPU/RAM checks
Upper CPU Threshold (%) 80 CPU% above which concurrency is halved
Lower CPU Threshold (%) 30 CPU% below which concurrency is doubled
Upper RAM Threshold (%) 80 RAM% above which concurrency is halved
Lower RAM Threshold (%) 30 RAM% below which concurrency is doubled

Behavior

  • Uses pidusage to sample process CPU and memory.
  • Maintains sliding‐window averages with weighted samples.
  • Applies hysteresis:
    • Decrease concurrency if CPU ≥ upper CPU threshold or RAM ≥ upper RAM threshold.
    • Increase concurrency if both CPU < lower CPU threshold and RAM < lower RAM threshold.
  • Doubles or halves the global concurrency, clamped between min/max.

📖 API Reference

StreamProcessor

const { StreamProcessor } = require("redis-syslog-consumer");

Constructor

new StreamProcessor(
  url: string,
  stream: string,
  concurrency: number,
  onMessageCallback: (msg: { payload: string; _redisId: string }) => void,
  logger: (message: string, context?: string) => void
)
  • url: Redis connection URI
  • stream: Redis stream key
  • concurrency: max parallel messages
  • onMessageCallback: invoked with each parsed message
  • logger: Node-RED logger function

Methods

  • initialize(): Promise<void> – start consuming (create group, drain backlog, poll).
  • acknowledgeAndDelete(messageId: string): Promise<void> – XACK + XDEL for a given message.
  • setConcurrency(newConcurrency: number) – dynamically adjust concurrency.

GlobalResource (under the hood)

const { GlobalResource } = require("redis-syslog-consumer/configuration");
  • Used internally by the config node; not typically instantiated by users.

🛠 Example Flow

[
  {
    "id": "1",
    "type": "redis-syslog-resource-manager",
    "name": "ResourceManager",
    "concurrencyMax": 20,
    "concurrencyMin": 1,
    "samplingCount": 15,
    "samplingInterval": 3000,
    "thresholdUpper": 75,
    "thresholdLower": 25,
    "thresholdUpperMem": 75,
    "thresholdLowerMem": 25
  },
  {
    "id": "2",
    "type": "redis-syslog-consumer",
    "stream": "syslog",
    "url": "redis://127.0.0.1:6379",
    "resource": "1",
    "wires": [["3"], ["4"]]
  }
]

Inject back to acknowledge:

{ "_redisId": "1680000000000-0", "_redisAck": true }

Contributing

Contributions are welcome! Please fork the repository, create a feature branch, and submit a pull request with a detailed explanation of changes.


Versioning

We use SemVer for versioning. For the versions available, see the tags on this repository.


Authors

  • Paolo Fabris - Initial work - ubyte.it

See also the list of contributors who participated in this project.


⚖️ License

This project is licensed under the MIT License. See the LICENSE file for details.

Node Info

Version: 1.1.2
Updated 1 week, 6 days ago
Rating: not yet rated

Categories

Actions

Rate:

Downloads

182 in the last week

Nodes

  • redis-syslog-resource-manager
  • redis-syslog-consumer

Keywords

  • node-red
  • redis
  • stream
  • ack
  • syslog

Maintainers