node-red-contrib-multiple-queue 0.2.1
A multiple message queue node for node-RED
node-red-contrib-multiple-queue
A Node-RED node providing multiple, independently controlled message queues
Install
Use the Node-RED Manage Palette
command or run the following command in the Node-RED user directory (typically ~/.node-red
):
npm install node-red-contrib-multiple-queue
Design
The m-queue
node provides multiple, independently controlled message queues. Each queue behaves similarly to the queueing
state of the node-red-contrib-queue-gate node. The requirement to control each queue individually, however, adds some complexity to the design, and this is reflected in the control mechanism. Queues are created as needed and destroyed when empty, except for the Default Queue
(see below). Each queue is identified by a Name
string, which is used to steer messages to it, where they are stored or used to control the queue. This name is matched against the Queue Selector
string provided by default in the msg.topic
property. The user can, however, select any other message property for this purpose, thus retaining flexibility. Similarly, all the other message properties used to control the queues are defined by default but are available to be customized in the node edit dialog.
Usage
Queueing messages
An incoming message is added to the queue identified by the Queue Selector
. If no such queue exists, one is created. If the Queue Selector
is undefined
or an empty string, the message is added to the Default Queue
, which is always available. A message to be queued or a control message (see below) can sent to all queues by setting the Queue Selector
to the value defined as All Queues
.
Controlling message queues
Messages with the user-defined property Control Flag
set to true
(or a JavaScript equivalent) are not queued but are used to control the queues. These control messages can have payloads (case-insensitive strings) that the user has defined to represent commands for trigger
, pause
, resume
,flush
, peek
, drop
, reset
, status
, maximum
, newest
, protect
, and delete
. If a control message is received with a payload that is a number or boolean, the payload is converted to a string and then tested against the command definitions. If a control message is received but not recognized, there is no output or change of state, and the node issues a warning.
Each queue responds independently to control messages addressed to it by performing the following actions:
If a queue has been paused by the pause
command, it will no longer queue incoming messages or accept most commands. Only commands that do not affect the contents of a queue will be executed while it is paused. These are indicated by an asterisk in the table above.
By default, messages arriving when a queue is full are discarded, so that the queue contains the oldest messages. The user can, however, set the Keep newest messages
checkbox in order to have the Default Queue
and all new queues created with the opposite behavior: new messages are added to the queue (at the tail), while discarding the oldest message (from the head), with the result that the queue contains the most recent messages. This behavior can be controlled on an individual queue basis by addressing a newest
control message to that queue with the value
property set to true
or false
. If the value
is undefined
, the Keep newest messages
property will be set to its default. Changes to the Keep newest messages
property do not affect messages already in the queue.
Memory Management
To prevent excessive memory usage, the node is configured by default to limit the size of message queues and to delete queues when they are empty. The Default Queue
and all new queues will be created with the limit specified in the edit dialog, which can be changed by addressing a maximum
command to one or all queues and providing the maximum value in the value
property. There may be use cases (see the Retain Until Processed example below) where it is helpful to override these defaults. The queue size limit is eliminated by entering a Maximum Queue Size
value of zero or less. A queue will not be deleted when empty if its Protect
attribute is set to true
. These values can be made the default for all queues in the edit dialog or changed for individual queues by using the maximum
and protect
commands. If the value
property for either command is undefined
, the default setting will be used for the queue. Changes to the maximum queue size do not affect messages already in the queue. Note that the Default Queue
is created with its Protect
attribute set to true
, which cannot be modified. Hence, this queue can never be deleted.
Despite these precautions, it is possible to consume excessive amounts of memory if messages arrive with new (previously unseen) values of the Queue Selector
, thus generating very many queues. The user is advised to monitor the number of active queues indicated by the node status text (see below) and to take appropriate action if it becomes unexpectedly large.
Node status
A limited indication of node activity is provided by the status text, which displays the number of queues in operation and the total number of messages queued. When a status
command is received, the status text is refreshed so that it can be caught by a status
node. In addition, if the Send status to second output
checkbox is selected in the edit dialog, the node is deployed with a second output that sends a complete description of the selected queue(s) as an object of the form:
This is shown in the second flow in the *Node Status* example below.
Persistence
The state of the node (all queues and their contents) is maintained in the node context. If a persistent (non-volatile) form of context storage is available, the user has the option of restoring the state from that storage after a restart of Node-RED. This is done by activating the Restore from state saved in
option (checkbox) in the edit dialog and choosing a non-volatile storage module from the adjacent dropdown list, which shows all the storage modules enabled in the Node-RED settings.js
file.
Examples
Basic Operation
This flow demonstrates the basic operation of the m-queue
node and the commands that can be used to change or display its state or manage the queue.
[{"id":"75a2f227.f318d4","type":"tab","label":"m-queue-demo","disabled":false,"info":""},{"id":"2adbd30f.24601c","type":"group","z":"75a2f227.f318d4","style":{"stroke":"#999999","stroke-opacity":"1","fill":"none","fill-opacity":"1","label":true,"label-position":"nw","color":"#a4a4a4"},"nodes":["26c05686.a92e92","7a7221f7.febf3","34529be4.051ac4","a057816c.0028a8","98b3dc41.f34378","cd879e3a.5c237"],"x":574,"y":359,"w":342,"h":242},{"id":"5f7860c0.3d0d88","type":"group","z":"75a2f227.f318d4","style":{"stroke":"#999999","stroke-opacity":"1","fill":"none","fill-opacity":"1","label":true,"label-position":"nw","color":"#a4a4a4"},"nodes":["7f658eda.a31058","8005b7c4.45eb38","1890649a.137773","25632026.e297c8","24c8146a.437ab4"],"x":54,"y":539,"w":432,"h":142},{"id":"d73c5c87.a36288","type":"group","z":"75a2f227.f318d4","style":{"stroke":"#999999","stroke-opacity":"1","fill":"none","fill-opacity":"1","label":true,"label-position":"nw","color":"#a4a4a4"},"nodes":["7331f684.0d42e8","9a8b71f3.b3b6a8","5c857d77.ef0034","a34c033a.3c84c","7ee4d0ac.0358a","29489dd3.db6d2a","54480c33.ce1c9c","6e0fa707.f6df7"],"x":514,"y":59,"w":442,"h":282},{"id":"f65a40d2.c66fd","type":"group","z":"75a2f227.f318d4","style":{"stroke":"#999999","stroke-opacity":"1","fill":"none","fill-opacity":"1","label":true,"label-position":"nw","color":"#a4a4a4"},"nodes":["2e81b27e.38c676","d4a8baa4.2ed418","e2d5eba0.e82968","62b8ebe6.e34254","2968d6c2.1c426a","524b3a66.c598fc","d475141f.f79e6","7bdcc2a.3d5bf3c","9413257a.f34068","52ea570d.d57e08","1d0498ae.977b07","8f51fc27.7461a","d90cbf00.23cf"],"x":34,"y":43,"w":462,"h":482},{"id":"2e81b27e.38c676","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"input","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"","payloadType":"date","x":130,"y":304,"wires":[["8f51fc27.7461a"]]},{"id":"7331f684.0d42e8","type":"inject","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"input","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue1","payload":"","payloadType":"date","x":610,"y":200,"wires":[["9a8b71f3.b3b6a8"]]},{"id":"d4a8baa4.2ed418","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"trigger","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"trigger","payloadType":"str","x":270,"y":124,"wires":[["8f51fc27.7461a"]],"outputLabels":["msg = {payload:\"trigger\",topic:\"default\",control:true }"],"info":"msg = {\n payload:\"trigger\",\n topic:\"default\",\n control:true\n}"},{"id":"e2d5eba0.e82968","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"peek","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"peek","payloadType":"str","x":270,"y":284,"wires":[["8f51fc27.7461a"]]},{"id":"62b8ebe6.e34254","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"pause","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"pause","payloadType":"str","x":270,"y":164,"wires":[["8f51fc27.7461a"]],"outputLabels":["msg = { payload:\"pause\", topic:\"default\", control:true }"],"info":"msg = {\npayload:\"pause\",\ntopic:\"default\",\ncontrol:true\n}"},{"id":"2968d6c2.1c426a","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"resume","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"resume","payloadType":"str","x":270,"y":204,"wires":[["8f51fc27.7461a"]],"outputLabels":["payload:\"resume\" \\n topic:\"default\" \\n control:true"],"info":"msg = {\npayload:\"resume\",\ntopic:\"default\",\ncontrol:true\n}"},{"id":"524b3a66.c598fc","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"drop","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"drop","payloadType":"str","x":270,"y":324,"wires":[["8f51fc27.7461a"]]},{"id":"d475141f.f79e6","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"flush","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"flush","payloadType":"str","x":270,"y":244,"wires":[["8f51fc27.7461a"]]},{"id":"5c857d77.ef0034","type":"inject","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"flush","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue1","payload":"flush","payloadType":"str","x":770,"y":180,"wires":[["9a8b71f3.b3b6a8"]]},{"id":"7bdcc2a.3d5bf3c","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"reset","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"reset","payloadType":"str","x":270,"y":364,"wires":[["8f51fc27.7461a"]]},{"id":"a34c033a.3c84c","type":"inject","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"reset","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue1","payload":"reset","payloadType":"str","x":770,"y":260,"wires":[["9a8b71f3.b3b6a8"]]},{"id":"9413257a.f34068","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"maximum","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"},{"p":"value","v":"1","vt":"num"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"maximum","payloadType":"str","x":260,"y":404,"wires":[["8f51fc27.7461a"]]},{"id":"52ea570d.d57e08","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"status","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"status","payloadType":"str","x":270,"y":484,"wires":[["8f51fc27.7461a"]]},{"id":"7ee4d0ac.0358a","type":"inject","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"trigger","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue1","payload":"trigger","payloadType":"str","x":770,"y":140,"wires":[["9a8b71f3.b3b6a8"]]},{"id":"26c05686.a92e92","type":"inject","z":"75a2f227.f318d4","g":"2adbd30f.24601c","name":"input","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"all","payload":"","payloadType":"date","x":670,"y":480,"wires":[["7a7221f7.febf3"]]},{"id":"1d0498ae.977b07","type":"comment","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"default queue","info":"","x":250,"y":84,"wires":[]},{"id":"8f51fc27.7461a","type":"link out","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"","links":["7f658eda.a31058"],"x":455,"y":304,"wires":[]},{"id":"9a8b71f3.b3b6a8","type":"link out","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"","links":["7f658eda.a31058","10a5a17f.d199af"],"x":915,"y":200,"wires":[]},{"id":"6e0fa707.f6df7","type":"comment","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"queue #1","info":"","x":760,"y":100,"wires":[]},{"id":"7a7221f7.febf3","type":"link out","z":"75a2f227.f318d4","g":"2adbd30f.24601c","name":"","links":["7f658eda.a31058"],"x":875,"y":480,"wires":[]},{"id":"7f658eda.a31058","type":"link in","z":"75a2f227.f318d4","g":"5f7860c0.3d0d88","name":"","links":["41a22656.ad437","7a7221f7.febf3","8f51fc27.7461a","9a8b71f3.b3b6a8"],"x":95,"y":580,"wires":[["24c8146a.437ab4"]]},{"id":"34529be4.051ac4","type":"comment","z":"75a2f227.f318d4","g":"2adbd30f.24601c","name":"all queues","info":"","x":720,"y":400,"wires":[]},{"id":"d90cbf00.23cf","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"newest","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"},{"p":"value","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"newest","payloadType":"str","x":270,"y":444,"wires":[["8f51fc27.7461a"]]},{"id":"29489dd3.db6d2a","type":"inject","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"peek","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue1","payload":"peek","payloadType":"str","x":770,"y":220,"wires":[["9a8b71f3.b3b6a8"]]},{"id":"a057816c.0028a8","type":"inject","z":"75a2f227.f318d4","g":"2adbd30f.24601c","name":"trigger","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"all","payload":"trigger","payloadType":"str","x":730,"y":440,"wires":[["7a7221f7.febf3"]]},{"id":"98b3dc41.f34378","type":"inject","z":"75a2f227.f318d4","g":"2adbd30f.24601c","name":"reset","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"all","payload":"reset","payloadType":"str","x":730,"y":520,"wires":[["7a7221f7.febf3"]]},{"id":"8005b7c4.45eb38","type":"debug","z":"75a2f227.f318d4","g":"5f7860c0.3d0d88","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":370,"y":580,"wires":[]},{"id":"1890649a.137773","type":"status","z":"75a2f227.f318d4","g":"5f7860c0.3d0d88","name":"","scope":null,"x":200,"y":640,"wires":[["25632026.e297c8"]]},{"id":"25632026.e297c8","type":"debug","z":"75a2f227.f318d4","g":"5f7860c0.3d0d88","name":"","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"status.text","targetType":"msg","statusVal":"","statusType":"auto","x":360,"y":640,"wires":[]},{"id":"cd879e3a.5c237","type":"inject","z":"75a2f227.f318d4","g":"2adbd30f.24601c","name":"status","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"all","payload":"status","payloadType":"str","x":730,"y":560,"wires":[["7a7221f7.febf3"]]},{"id":"24c8146a.437ab4","type":"m-queue","z":"75a2f227.f318d4","g":"5f7860c0.3d0d88","name":"m-queue-demo","queueSelect":"topic","controlFlag":"control","defaultQueue":"default","allQueues":"all","triggerCmd":"trigger","statusCmd":"status","pauseCmd":"pause","resumeCmd":"resume","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","maximumCmd":"maximum","newestCmd":"newest","protectCmd":"protect","deleteCmd":"delete","paused":false,"protect":false,"keepNewestDefault":false,"maxSizeDefault":100,"protectDefault":false,"persist":false,"newValue":"value","storeName":"memoryOnly","statusOutput":false,"outputs":1,"x":220,"y":580,"wires":[["8005b7c4.45eb38"]]},{"id":"54480c33.ce1c9c","type":"inject","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"protect","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"},{"p":"value","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue1","payload":"protect","payloadType":"str","x":770,"y":300,"wires":[["9a8b71f3.b3b6a8"]]}]
Node Status
This flow shows the default response to the status
command and the optional use of a second output to obtain the complete status object for one or all queues.
[{"id":"a25c5984.bb3248","type":"tab","label":"m-queue-status","disabled":false,"info":""},{"id":"44b776c1.99b3b","type":"group","z":"a25c5984.bb3248","style":{"stroke":"#999999","stroke-opacity":"1","fill":"none","fill-opacity":"1","label":true,"label-position":"nw","color":"#a4a4a4"},"nodes":["fff3933.09139f","2098ab05.a81144","ae212584.32f4a8","e07b4047.96a12","8e739fe6.11563","9b2f745c.baab3","4917d5dc.337fbc"],"x":146,"y":39,"w":592,"h":182},{"id":"95d4e6cc.92ee68","type":"group","z":"a25c5984.bb3248","style":{"stroke":"#999999","stroke-opacity":"1","fill":"none","fill-opacity":"1","label":true,"label-position":"nw","color":"#a4a4a4"},"nodes":["3e4571de.1304ee","501b900a.a75008","b3cb0e71.5314a8","67f91541.df8e14","88fc842e.b843a","1c011b1d.17da95"],"x":166,"y":239,"w":552,"h":162},{"id":"fff3933.09139f","type":"inject","z":"a25c5984.bb3248","g":"44b776c1.99b3b","name":"status","props":[{"p":"payload"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"status","payloadType":"str","x":302,"y":160,"wires":[["4917d5dc.337fbc"]]},{"id":"2098ab05.a81144","type":"debug","z":"a25c5984.bb3248","g":"44b776c1.99b3b","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":602,"y":120,"wires":[]},{"id":"ae212584.32f4a8","type":"status","z":"a25c5984.bb3248","g":"44b776c1.99b3b","name":"","scope":["4917d5dc.337fbc"],"x":452,"y":180,"wires":[["e07b4047.96a12"]]},{"id":"e07b4047.96a12","type":"debug","z":"a25c5984.bb3248","g":"44b776c1.99b3b","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"status.text","targetType":"msg","statusVal":"","statusType":"auto","x":612,"y":180,"wires":[]},{"id":"88fc842e.b843a","type":"debug","z":"a25c5984.bb3248","g":"95d4e6cc.92ee68","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":622,"y":300,"wires":[]},{"id":"1c011b1d.17da95","type":"debug","z":"a25c5984.bb3248","g":"95d4e6cc.92ee68","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":622,"y":340,"wires":[]},{"id":"8e739fe6.11563","type":"inject","z":"a25c5984.bb3248","g":"44b776c1.99b3b","name":"input","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":242,"y":120,"wires":[["4917d5dc.337fbc"]]},{"id":"b3cb0e71.5314a8","type":"inject","z":"a25c5984.bb3248","g":"95d4e6cc.92ee68","name":"status","props":[{"p":"payload"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"status","payloadType":"str","x":322,"y":360,"wires":[["501b900a.a75008"]]},{"id":"67f91541.df8e14","type":"inject","z":"a25c5984.bb3248","g":"95d4e6cc.92ee68","name":"input","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":262,"y":320,"wires":[["501b900a.a75008"]]},{"id":"9b2f745c.baab3","type":"inject","z":"a25c5984.bb3248","g":"44b776c1.99b3b","name":"trigger","props":[{"p":"payload"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"trigger","payloadType":"str","x":302,"y":80,"wires":[["4917d5dc.337fbc"]]},{"id":"3e4571de.1304ee","type":"inject","z":"a25c5984.bb3248","g":"95d4e6cc.92ee68","name":"trigger","props":[{"p":"payload"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"trigger","payloadType":"str","x":322,"y":280,"wires":[["501b900a.a75008"]]},{"id":"501b900a.a75008","type":"m-queue","z":"a25c5984.bb3248","g":"95d4e6cc.92ee68","name":"queue #2","queueSelect":"topic","controlFlag":"control","defaultQueue":"default","allQueues":"all","triggerCmd":"trigger","statusCmd":"status","pauseCmd":"pause","resumeCmd":"resume","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","maximumCmd":"maximum","newestCmd":"newest","protectCmd":"protect","deleteCmd":"delete","paused":false,"protect":false,"keepNewestDefault":false,"maxSizeDefault":100,"protectDefault":false,"persist":false,"newValue":"value","storeName":"memoryOnly","statusOutput":true,"outputs":2,"x":472,"y":320,"wires":[["88fc842e.b843a"],["1c011b1d.17da95"]]},{"id":"4917d5dc.337fbc","type":"m-queue","z":"a25c5984.bb3248","g":"44b776c1.99b3b","name":"queue #1","queueSelect":"topic","controlFlag":"control","defaultQueue":"default","allQueues":"all","triggerCmd":"trigger","statusCmd":"status","pauseCmd":"pause","resumeCmd":"resume","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","maximumCmd":"maximum","newestCmd":"newest","protectCmd":"protect","deleteCmd":"delete","paused":false,"protect":false,"keepNewestDefault":false,"maxSizeDefault":100,"protectDefault":false,"persist":false,"newValue":"value","storeName":"memoryOnly","statusOutput":false,"outputs":1,"x":452,"y":120,"wires":[["2098ab05.a81144"]]}]
Retain Until Processed
This flow uses the peek
and drop
commands in order to: release the oldest message without deleting it from the queue, process it (here simply waiting 5 seconds for demonstration purposes), then remove it from the head of the queue and release the next message. Processing stops once the queue is empty. It makes use of the Protect Queue
feature to prevent empty queues from being deleted and producing warning messages.
[{"id":"6da8b2bf.e9aefc","type":"tab","label":"m-queue-retain","disabled":false,"info":""},{"id":"9d65188.ddc48e8","type":"inject","z":"6da8b2bf.e9aefc","name":"inject default","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"","payloadType":"date","x":190,"y":120,"wires":[["4706f6af.0460c8"]]},{"id":"a127c619.20b348","type":"delay","z":"6da8b2bf.e9aefc","name":"process","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":540,"y":180,"wires":[["77fe24e0.10287c","89fed29e.7fe85"]]},{"id":"1d3379cf.e4c5e6","type":"inject","z":"6da8b2bf.e9aefc","name":"process default","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"peek","payloadType":"str","x":180,"y":160,"wires":[["4706f6af.0460c8"]]},{"id":"6f059504.1bff7c","type":"link in","z":"6da8b2bf.e9aefc","name":"","links":["2362977b.f1fe08"],"x":235,"y":60,"wires":[["4706f6af.0460c8"]]},{"id":"77fe24e0.10287c","type":"debug","z":"6da8b2bf.e9aefc","name":"output","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","x":690,"y":220,"wires":[]},{"id":"89fed29e.7fe85","type":"function","z":"6da8b2bf.e9aefc","name":"get next","func":"let queue = msg.topic\nnode.send({topic: queue,control: true, payload: \"drop\"})\nnode.send({topic: queue,control: true, payload: \"peek\"})","outputs":1,"noerr":0,"initialize":"","finalize":"","x":700,"y":180,"wires":[["2362977b.f1fe08"]]},{"id":"2362977b.f1fe08","type":"link out","z":"6da8b2bf.e9aefc","name":"","links":["6f059504.1bff7c"],"x":815,"y":140,"wires":[]},{"id":"3b6a4ccd.a631f4","type":"inject","z":"6da8b2bf.e9aefc","name":"inject queue #1","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue #1","payload":"","payloadType":"date","x":180,"y":220,"wires":[["4706f6af.0460c8"]]},{"id":"f975a83b.70d32","type":"inject","z":"6da8b2bf.e9aefc","name":"process queue #1","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue #1","payload":"peek","payloadType":"str","x":170,"y":260,"wires":[["4706f6af.0460c8"]]},{"id":"4706f6af.0460c8","type":"m-queue","z":"6da8b2bf.e9aefc","name":"","queueSelect":"topic","controlFlag":"control","defaultQueue":"default","allQueues":"all","triggerCmd":"trigger","statusCmd":"status","pauseCmd":"pause","resumeCmd":"resume","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","maximumCmd":"maximum","newestCmd":"newest","protectCmd":"protect","deleteCmd":"delete","paused":false,"protect":false,"keepNewestDefault":false,"maxSizeDefault":100,"protectDefault":true,"persist":false,"newValue":"value","storeName":"memoryOnly","statusOutput":false,"outputs":1,"x":400,"y":180,"wires":[["a127c619.20b348"]]}]
Author
Mike Bell (drmike)