@inteli.city/node-red-contrib-rabbit-mq 1.0.1

Node-RED nodes for producing, consuming and acknowledging RabbitMQ messages

npm install @inteli.city/node-red-contrib-rabbit-mq

@inteli.city/node-red-contrib-rabbit-mq

Node-RED nodes for consuming, producing, and acknowledging RabbitMQ messages via AMQP.


Table of Contents


Overview

These three nodes implement the consume → process → ack pattern for RabbitMQ.

  • mq.consumer connects to a queue and emits messages into the Node-RED flow
  • mq.ackw acknowledges the message after processing — required
  • mq.producer publishes messages to an exchange with a routing key

The consumer operates in manual ack mode. RabbitMQ only removes a message from the queue when mq.ackw is called. Without an ack, the message stays in-flight until the timeout fires and is then requeued.


Nodes

mq.consumer

Connects to a RabbitMQ queue and emits one message per delivery.

Parameters

Field Description
Host Broker address
User / Password AMQP credentials
Queue Queue name (asserted as durable on connect)
Prefetch Maximum number of unacknowledged messages at once
Timeout (min) Maximum time before automatic nack. 0 uses the default of 120 minutes (2 hours). Any positive value is used as-is
SSL/TLS Use amqps:// — required for cloud brokers
Debug mode Enables verbose logs

Behavior

  • Connects automatically on deploy
  • On failure: reconnects with exponential backoff (1s → 2s → ... → 60s)
  • Prefetch limits parallelism: with prefetch=1, the consumer only receives the next message after acking the current one
  • Timeout: if the flow does not call mq.ackw within the configured window, the message is automatically nacked and requeued

Output

Each emitted message contains:

msg.payload     → message content (JSON or string)
msg.rabbitmq    → internal context (do not modify)
msg._msgid      → delivery identifier

Warning: msg.rabbitmq must be passed intact to mq.ackw. Any node that recreates the msg object (e.g. return { payload: ... }) will destroy this context.


mq.ackw

Acknowledges a message to RabbitMQ.

When to use

Always, after processing a message consumed by mq.consumer.

What it does

  • Calls channel.ack(message) on the broker
  • Cancels the message timeout timer
  • Passes msg downstream (nodes can be chained after the ack)

Consequences of not using it

  • The message remains "unacked" in RabbitMQ
  • The prefetch window fills up after N messages (where N = configured prefetch)
  • The consumer silently stops receiving new messages
  • After the timeout, the message is nacked and requeued — the cycle repeats

mq.producer

Publishes messages to a RabbitMQ exchange.

Parameters

Field Description
Host Broker address
User / Password AMQP credentials
Exchange Exchange name (asserted as direct and durable on connect)
Routing Key Default routing key
SSL/TLS Use amqps:// — required for cloud brokers
Debug mode Enables publish logs

Connection behavior

  • Connects on deploy. On failure: exponential backoff (1s → 30s)
  • Uses a confirm channel: only advances in the flow after the broker confirms receipt
  • msg.exchange and msg.routingKey override the configured values per message

When disconnected

Messages are dropped. If the producer is not connected when a message arrives, the message is lost and a warn is written to the logs.

There is no internal buffer. Messages arriving during reconnection are discarded.


Basic Flow

mq.consumer → [processing] → mq.ackw
  1. mq.consumer receives the message and emits it
  2. The flow processes it (function node, HTTP request, database, etc.)
  3. mq.ackw confirms to RabbitMQ that the message was handled

Ack is mandatory. Without it:

message delivered → not acked → timeout (≥5 min) → nack → requeued → delivered again

System Behavior

Automatic reconnect All nodes reconnect automatically with exponential backoff. Restarting Node-RED is not required.

Prefetch Controls how many messages can be in-flight simultaneously per consumer. With prefetch=1, the flow is strictly sequential. With prefetch=N, up to N messages can be processed in parallel — but all must be acked.

Blocking due to missing ack If all prefetch slots are occupied by unacked messages, RabbitMQ stops delivering new messages to that consumer. No error is raised — the consumer simply goes silent.

Redelivery Nacked or timed-out messages are requeued and redelivered. The field msg.rabbitmq.message.fields.redelivered indicates whether a message is a redelivery.


Important Decisions

  • The producer drops messages when disconnected. There is no internal buffer. If guaranteed delivery is required, implement persistence in the flow before the producer.

  • The system is at-least-once. The same message may be delivered more than once (after nack, reconnection, or timeout). Flows should be idempotent or detect duplicates.

  • Ack is the flow's responsibility. mq.consumer does not ack automatically. Every path through the flow must reach mq.ackw — including error paths.


Common Issues

  • Consumer stopped receiving messages → Prefetch is saturated. Most common cause: mq.ackw is not being reached in the flow (disconnected node, silent error, or msg.rabbitmq destroyed by a function node).

  • Duplicate messages → Expected behavior. Indicates a nack or reconnection occurred before the ack.

  • Messages disappearing in the producer → The producer was disconnected. Messages arriving during reconnection are discarded.

  • msg.rabbitmq is undefined in mq.ackw → An upstream node recreated the msg object without preserving msg.rabbitmq. Fix by using msg.payload = ...; return msg; instead of return { payload: ... }.

  • Consumer connects but receives no messages after restart → Check if the queue has messages with "unacked" status from a previous session. They will be requeued once the old connection expires on the broker.


Debug

Debug mode is enabled via checkbox in mq.consumer and mq.producer.

When enabled, the following logs appear:

mq.consumer:

  • Connection details (URL, channel, queue, consumerTag)
  • Each message received (msgid, deliveryTag)
  • Inflight size and prefetch state
  • 10s timer to detect flows missing an ack

mq.producer:

  • Connection established
  • Each publish (exchange, routing key)

When to enable:

  • To investigate why the consumer stopped receiving messages
  • To confirm that messages are being published
  • To trace the lifecycle of a specific message

Disable in production. Generates one log entry per message, creating excessive noise and unnecessary I/O.


Simple Example

Publishing messages

[inject (repeat: 1s)] → [mq.producer]

Configure mq.producer:

  • Exchange: X.Teste
  • Routing Key: R.Teste
  • SSL: enabled

Consuming and acknowledging messages

[mq.consumer] → [debug] → [mq.ackw]

Configure mq.consumer:

  • Queue: X.Teste
  • Prefetch: 1
  • Timeout: 0 (uses the 2-hour default)
  • SSL: enabled

The debug node displays msg.payload. The mq.ackw confirms to the broker. Without mq.ackw at the end, the consumer will stall after the first message (prefetch=1).

Node Info

Version: 1.0.1
Updated 6 hours ago
License: Apache-2.0
Rating: not yet rated

Categories

Actions

Rate:

Downloads

0 in the last week

Nodes

  • mq.consumer
  • mq.ackw
  • mq.producer

Keywords

  • node-red
  • rabbitmq
  • amqp

Maintainers