@inteli.city/node-red-contrib-rabbit-mq 1.0.1
Node-RED nodes for producing, consuming and acknowledging RabbitMQ messages
@inteli.city/node-red-contrib-rabbit-mq
Node-RED nodes for consuming, producing, and acknowledging RabbitMQ messages via AMQP.
Table of Contents
Overview
These three nodes implement the consume → process → ack pattern for RabbitMQ.
mq.consumerconnects to a queue and emits messages into the Node-RED flowmq.ackwacknowledges the message after processing — requiredmq.producerpublishes messages to an exchange with a routing key
The consumer operates in manual ack mode. RabbitMQ only removes a message from the queue when mq.ackw is called. Without an ack, the message stays in-flight until the timeout fires and is then requeued.
Nodes
mq.consumer
Connects to a RabbitMQ queue and emits one message per delivery.
Parameters
| Field | Description |
|---|---|
| Host | Broker address |
| User / Password | AMQP credentials |
| Queue | Queue name (asserted as durable on connect) |
| Prefetch | Maximum number of unacknowledged messages at once |
| Timeout (min) | Maximum time before automatic nack. 0 uses the default of 120 minutes (2 hours). Any positive value is used as-is |
| SSL/TLS | Use amqps:// — required for cloud brokers |
| Debug mode | Enables verbose logs |
Behavior
- Connects automatically on deploy
- On failure: reconnects with exponential backoff (1s → 2s → ... → 60s)
- Prefetch limits parallelism: with
prefetch=1, the consumer only receives the next message after acking the current one - Timeout: if the flow does not call
mq.ackwwithin the configured window, the message is automatically nacked and requeued
Output
Each emitted message contains:
msg.payload → message content (JSON or string)
msg.rabbitmq → internal context (do not modify)
msg._msgid → delivery identifier
Warning:
msg.rabbitmqmust be passed intact tomq.ackw. Any node that recreates themsgobject (e.g.return { payload: ... }) will destroy this context.
mq.ackw
Acknowledges a message to RabbitMQ.
When to use
Always, after processing a message consumed by mq.consumer.
What it does
- Calls
channel.ack(message)on the broker - Cancels the message timeout timer
- Passes
msgdownstream (nodes can be chained after the ack)
Consequences of not using it
- The message remains "unacked" in RabbitMQ
- The prefetch window fills up after N messages (where N = configured prefetch)
- The consumer silently stops receiving new messages
- After the timeout, the message is nacked and requeued — the cycle repeats
mq.producer
Publishes messages to a RabbitMQ exchange.
Parameters
| Field | Description |
|---|---|
| Host | Broker address |
| User / Password | AMQP credentials |
| Exchange | Exchange name (asserted as direct and durable on connect) |
| Routing Key | Default routing key |
| SSL/TLS | Use amqps:// — required for cloud brokers |
| Debug mode | Enables publish logs |
Connection behavior
- Connects on deploy. On failure: exponential backoff (1s → 30s)
- Uses a confirm channel: only advances in the flow after the broker confirms receipt
msg.exchangeandmsg.routingKeyoverride the configured values per message
When disconnected
Messages are dropped. If the producer is not connected when a message arrives, the message is lost and a
warnis written to the logs.
There is no internal buffer. Messages arriving during reconnection are discarded.
Basic Flow
mq.consumer → [processing] → mq.ackw
mq.consumerreceives the message and emits it- The flow processes it (function node, HTTP request, database, etc.)
mq.ackwconfirms to RabbitMQ that the message was handled
Ack is mandatory. Without it:
message delivered → not acked → timeout (≥5 min) → nack → requeued → delivered again
System Behavior
Automatic reconnect All nodes reconnect automatically with exponential backoff. Restarting Node-RED is not required.
Prefetch
Controls how many messages can be in-flight simultaneously per consumer. With prefetch=1, the flow is strictly sequential. With prefetch=N, up to N messages can be processed in parallel — but all must be acked.
Blocking due to missing ack If all prefetch slots are occupied by unacked messages, RabbitMQ stops delivering new messages to that consumer. No error is raised — the consumer simply goes silent.
Redelivery
Nacked or timed-out messages are requeued and redelivered. The field msg.rabbitmq.message.fields.redelivered indicates whether a message is a redelivery.
Important Decisions
The producer drops messages when disconnected. There is no internal buffer. If guaranteed delivery is required, implement persistence in the flow before the producer.
The system is at-least-once. The same message may be delivered more than once (after nack, reconnection, or timeout). Flows should be idempotent or detect duplicates.
Ack is the flow's responsibility.
mq.consumerdoes not ack automatically. Every path through the flow must reachmq.ackw— including error paths.
Common Issues
Consumer stopped receiving messages → Prefetch is saturated. Most common cause:
mq.ackwis not being reached in the flow (disconnected node, silent error, ormsg.rabbitmqdestroyed by a function node).Duplicate messages → Expected behavior. Indicates a nack or reconnection occurred before the ack.
Messages disappearing in the producer → The producer was disconnected. Messages arriving during reconnection are discarded.
msg.rabbitmqis undefined in mq.ackw → An upstream node recreated themsgobject without preservingmsg.rabbitmq. Fix by usingmsg.payload = ...; return msg;instead ofreturn { payload: ... }.Consumer connects but receives no messages after restart → Check if the queue has messages with "unacked" status from a previous session. They will be requeued once the old connection expires on the broker.
Debug
Debug mode is enabled via checkbox in mq.consumer and mq.producer.
When enabled, the following logs appear:
mq.consumer:
- Connection details (URL, channel, queue, consumerTag)
- Each message received (msgid, deliveryTag)
- Inflight size and prefetch state
- 10s timer to detect flows missing an ack
mq.producer:
- Connection established
- Each publish (exchange, routing key)
When to enable:
- To investigate why the consumer stopped receiving messages
- To confirm that messages are being published
- To trace the lifecycle of a specific message
Disable in production. Generates one log entry per message, creating excessive noise and unnecessary I/O.
Simple Example
Publishing messages
[inject (repeat: 1s)] → [mq.producer]
Configure mq.producer:
- Exchange:
X.Teste - Routing Key:
R.Teste - SSL: enabled
Consuming and acknowledging messages
[mq.consumer] → [debug] → [mq.ackw]
Configure mq.consumer:
- Queue:
X.Teste - Prefetch:
1 - Timeout:
0(uses the 2-hour default) - SSL: enabled
The debug node displays msg.payload. The mq.ackw confirms to the broker. Without mq.ackw at the end, the consumer will stall after the first message (prefetch=1).