@corentin.pasquier/node-red-contrib-amqp 1.0.3
Node-RED AMQP nodes to connect to AMQP broker
node-red-contrib-amqp
Nœuds Node-RED robustes pour RabbitMQ/AMQP avec reconnexion automatique, logging détaillé et gestion avancée des erreurs.
Fonctionnalités
- Reconnexion automatique : Gère automatiquement les coupures de connexion et se reconnecte au broker
- ACK/NACK manuel : Contrôle total sur l'acquittement des messages
- Confirmation de publication : Mode confirm pour garantir la livraison des messages
- Dead Letter Exchange : Support complet pour les messages rejetés
- Logging détaillé : Suivi complet de toutes les opérations
- Prefetch configurable : Contrôle du nombre de messages non acquittés
- Propriétés AMQP complètes : Support de toutes les propriétés standard (persistent, priority, expiration, headers, etc.)
- Gestion des coupures pendant consommation : Reprise propre après reconnexion
Installation
npm install @corentin.pasquier/node-red-contrib-amqp
Ou depuis Node-RED : Menu → Manage palette → Install → Rechercher "@corentin.pasquier/node-red-contrib-amqp"
Nœuds disponibles
Ce module fournit 6 nœuds :
- AMQP Broker - Nœud de configuration pour la connexion au broker
- AMQP In - Consumer (consommation de messages depuis une queue)
- AMQP Out - Publisher (publication de messages vers un exchange)
- AMQP ACK - Acquittement de messages (approche visuelle)
- AMQP NACK - Rejet négatif avec option de requeue (approche visuelle)
- AMQP REJECT - Rejet de messages (approche visuelle)
AMQP Broker (Configuration)
Nœud de configuration qui gère la connexion au serveur RabbitMQ. Partagé entre tous les nœuds consumer/publisher.
Configuration :
- Host : Adresse du serveur RabbitMQ (défaut : localhost)
- Port : Port de connexion (défaut : 5672, ou 5671 pour TLS)
- Virtual Host : Virtual host RabbitMQ (défaut : /)
- Username : Nom d'utilisateur (défaut : guest)
- Password : Mot de passe (stocké de manière sécurisée)
- Use TLS : Activer le chiffrement TLS/SSL
- Reconnect Time : Délai entre les tentatives de reconnexion en ms (défaut : 5000)
- Heartbeat : Intervalle de heartbeat en secondes (défaut : 60)
AMQP In (Consumer)
Consomme des messages depuis une queue RabbitMQ avec ACK/NACK manuel.
Configuration :
- Queue : Nom de la queue à consommer
- Prefetch : Nombre maximum de messages non acquittés (QoS)
- Exchange (optionnel) : Exchange auquel binder la queue
- Exchange Type : Type d'exchange (direct, topic, fanout, headers)
- Routing Key : Clé de routage pour le binding (supporte les wildcards pour topic)
- Durable : Queue durable (survit au redémarrage du broker)
- Auto Delete : Suppression automatique quand plus de consumers
- Dead Letter Exchange (optionnel) : Exchange pour les messages rejetés
- DLX Routing Key (optionnel) : Routing key pour le DLX
Message sortant :
{
payload: {...}, // Contenu du message (JSON parsé si possible)
fields: { // Champs AMQP
deliveryTag: 1,
exchange: "...",
routingKey: "...",
redelivered: false
},
properties: {...}, // Propriétés AMQP
ack: function(), // Fonction pour acquitter le message
nack: function(requeue), // Fonction pour rejeter avec option de requeue
reject: function(requeue) // Fonction pour rejeter le message
}
Exemples d'utilisation :
// ACK simple
msg.ack();
return msg;
// NACK avec requeue
if (erreur_temporaire) {
msg.nack(true); // Requeue le message
}
// NACK sans requeue (envoi vers DLX si configuré)
if (erreur_permanente) {
msg.nack(false); // Vers DLX
}
// Reject
msg.reject(false); // Rejette sans requeue
AMQP ACK
Nœud dédié pour acquitter un message reçu via AMQP In.
Utilisation :
- Connecter après un nœud AMQP In (ou après traitement)
- Appelle automatiquement
msg.ack() - Permet une visualisation claire du flow d'acquittement
- Affiche un compteur de messages acquittés
- Passe le message au nœud suivant après ACK
Avantage : Rend le flow plus lisible qu'un appel de fonction dans un function node.
AMQP NACK
Nœud dédié pour rejeter négativement un message avec option de requeue.
Configuration :
- Requeue : Si coché, le message est remis en queue, sinon envoyé vers DLX (défaut : non coché)
Utilisation :
- Connecter après un nœud AMQP In
- Appelle automatiquement
msg.nack(requeue) - Le paramètre
requeuepeut être surchargé parmsg.requeue - Affiche un compteur et l'état (requeued ou to DLX)
Cas d'usage :
- NACK avec requeue=true : Erreur temporaire (service indisponible, retry plus tard)
- NACK avec requeue=false : Erreur permanente, envoi vers DLX pour analyse
AMQP REJECT
Nœud dédié pour rejeter un message (similaire à NACK mais pour un seul message).
Configuration :
- Requeue : Si coché, le message est remis en queue, sinon envoyé vers DLX (défaut : non coché)
Utilisation :
- Identique à AMQP NACK dans le comportement
- Sémantiquement différent : REJECT est plus explicite sur le rejet
- Utile pour distinguer visuellement les différents chemins d'erreur dans le flow
Note : Dans AMQP, NACK et REJECT sont fonctionnellement similaires pour un message unique.
AMQP Out (Publisher)
Publie des messages vers un exchange RabbitMQ avec confirmation optionnelle.
Configuration :
- Exchange : Nom de l'exchange cible
- Exchange Type : Type d'exchange (direct, topic, fanout, headers)
- Routing Key : Clé de routage par défaut (peut être surchargée)
- Durable : Exchange durable
- Confirm Mode : Attendre l'ACK du broker (recommandé)
- Persistent : Messages persistants (survivent au redémarrage)
- Mandatory : Retourner le message si non routé vers une queue
Message entrant :
Configuration depuis le message :
msg.payload = {...}; // Données à publier (sera JSON stringifié)
msg.exchange = "mon-exchange"; // Surcharge l'exchange configuré
msg.routingKey = "ma.cle"; // Surcharge la routing key
msg.topic = "ma.cle"; // Alternative à routingKey
// Propriétés AMQP optionnelles
msg.persistent = true; // Message persistant
msg.priority = 5; // Priorité 0-9
msg.expiration = "60000"; // TTL en ms (string)
msg.headers = { // Headers personnalisés
source: "web-app",
version: "1.0"
};
msg.correlationId = "12345"; // Pour pattern RPC
msg.replyTo = "rpc.replies"; // Queue de réponse RPC
msg.messageId = "msg-001"; // ID unique
msg.contentType = "application/json";
msg.contentEncoding = "utf-8";
msg.timestamp = Date.now();
msg.type = "order.created";
msg.userId = "user123";
msg.appId = "my-app";
Exemples de flow
Exemple 1a : Consumer simple avec ACK (approche function)
[amqp-in] → [function: traiter] → [debug]
// Dans le function node
if (msg.payload.valid) {
msg.ack();
node.log("Message traité avec succès");
} else {
msg.nack(false); // Vers DLX
node.error("Message invalide");
}
return msg;
Exemple 1b : Consumer simple avec ACK (approche visuelle avec nœuds dédiés)
┌→ [function: traiter] → [amqp-ack] → [debug]
[amqp-in] → [switch]│
└→ [function: logger] → [amqp-nack] → [debug]
// Dans le switch node : router selon msg.payload.valid
// Output 1 (valid=true) : vers traitement puis ACK
// Output 2 (valid=false) : vers logging puis NACK (DLX)
Avantage de l'approche visuelle : Le flow est plus facile à comprendre en un coup d'œil, idéal pour les architectures complexes.
Exemple 2 : Publisher avec propriétés
[inject] → [function: préparer] → [amqp-out]
// Dans le function node
msg.payload = {
orderId: 12345,
amount: 99.99
};
msg.routingKey = "orders.new";
msg.priority = 5;
msg.persistent = true;
msg.headers = {
source: "web-app",
timestamp: new Date().toISOString()
};
return msg;
Exemple 3 : Pattern RPC (Request/Reply)
Requête :
[inject] → [function: request] → [amqp-out]
msg.payload = { query: "getUser", userId: 123 };
msg.routingKey = "rpc.requests";
msg.replyTo = "rpc.replies." + flow.get("instanceId");
msg.correlationId = msg._msgid;
return msg;
Réponse :
[amqp-in: queue=rpc.replies.*] → [function: response] → [debug]
const correlationId = msg.properties.correlationId;
// Traiter la réponse selon correlationId
msg.ack();
return msg;
Exemple 4 : Retry avec DLX
Configuration :
- Exchange principal :
orders - Queue principale :
orders.processingavec DLX =orders.dlx - Exchange DLX :
orders.dlx - Queue retry :
orders.retry(bindée àorders.dlx) avec TTL et DLX =orders
Après le TTL, les messages retournent automatiquement vers orders pour un nouveau traitement.
Gestion de la reconnexion
Les nœuds gèrent automatiquement la reconnexion en cas de perte de connexion :
- Détection de coupure : Le broker détecte la fermeture de connexion
- Notification : Tous les nœuds (consumer/publisher) sont notifiés
- Nettoyage : Les channels sont fermés proprement
- Reconnexion automatique : Tentatives toutes les X ms (configurable)
- Reprise : Une fois reconnecté, les nœuds reprennent automatiquement :
- Consumer : Recrée le channel et reprend la consommation
- Publisher : Recrée le channel et envoie les messages en file d'attente
Important pour les consumers : Les messages non acquittés au moment de la coupure seront automatiquement requeued par RabbitMQ et redistribués une fois la connexion rétablie.
Logging
Tous les événements importants sont loggés avec des préfixes clairs :
[AMQP Broker {id}]: Événements du broker (connexion, reconnexion)[AMQP In {id}]: Événements du consumer (consommation, ACK/NACK)[AMQP Out {id}]: Événements du publisher (publication, confirmation)
Niveaux de log :
- INFO : Événements normaux (connexion, message reçu/envoyé)
- WARN : Avertissements (reconnexion, buffer plein)
- ERROR : Erreurs (échec de connexion, erreur de publication)
Bonnes pratiques
Toujours acquitter les messages : Ne jamais oublier d'appeler
msg.ack(),msg.nack()oumsg.reject(). Les messages non acquittés bloquent la consommation (selon prefetch).Utiliser le mode confirm : Active la confirmation de publication pour garantir que les messages sont bien reçus par le broker.
Configurer un prefetch approprié :
- Prefetch = 1 : Garantie de traitement séquentiel, mais moins performant
- Prefetch > 1 : Meilleure performance, mais messages peuvent être redelivrés dans le désordre en cas de coupure
Utiliser Dead Letter Exchange : Configure un DLX pour gérer les messages en erreur sans les perdre.
Messages persistants pour données critiques : Active
persistent: truepour les messages qui ne doivent pas être perdus en cas de redémarrage du broker.Gérer les erreurs de traitement :
try {
// Traitement du message
resultat = traiterMessage(msg.payload);
msg.ack();
} catch (err) {
if (err.code === 'RETRY') {
msg.nack(true); // Requeue pour retry
} else {
msg.nack(false); // Vers DLX
}
node.error(err);
}
- Monitoring : Surveiller les logs pour détecter :
- Reconnexions fréquentes (problème réseau ou broker)
- Messages vers DLX (erreurs de traitement)
- Buffer plein (débit trop élevé)
Dépannage
Le consumer ne reçoit pas de messages
- Vérifier que la queue existe et contient des messages
- Vérifier le binding exchange/queue/routing key
- Vérifier les permissions de l'utilisateur
- Vérifier les logs pour détecter les erreurs de connexion
Les messages ne sont pas publiés
- Vérifier que l'exchange existe
- En mode confirm, vérifier les logs pour les erreurs d'ACK
- Vérifier que le routing key correspond à un binding existant (si mandatory=true)
Reconnexions fréquentes
- Vérifier la stabilité du réseau
- Augmenter le heartbeat si le réseau est lent
- Vérifier les logs du broker RabbitMQ pour détecter des erreurs côté serveur
Messages redelivrés en boucle
- Vérifier que
msg.ack()est bien appelé - Vérifier qu'il n'y a pas d'exception avant l'ACK
- Configurer un DLX avec TTL pour limiter les retries
Licence
ISC
Auteur
Corentin PASQUIER
Changelog
1.0.0
- Implémentation initiale
- Nœud AMQP Broker avec reconnexion automatique
- Nœud AMQP In avec ACK/NACK manuel
- Nœud AMQP Out avec mode confirm
- Nœuds dédiés AMQP ACK, NACK et REJECT pour une approche visuelle
- Support complet des propriétés AMQP
- Dead Letter Exchange
- Logging détaillé
- Diagnostics d'erreur avancés
- Indicateurs de statut visuels pour tous les nœuds
- Documentation complète (API, tests, troubleshooting)
- Tests unitaires avec Mocha