node-red-contrib-multiple-queue 0.2.1

A multiple message queue node for node-RED

npm install node-red-contrib-multiple-queue

node-red-contrib-multiple-queue

A Node-RED node providing multiple, independently controlled message queues

Install

Use the Node-RED Manage Palette command or run the following command in the Node-RED user directory (typically ~/.node-red):

npm install node-red-contrib-multiple-queue

Design

The m-queue node provides multiple, independently controlled message queues. Each queue behaves similarly to the queueing state of the node-red-contrib-queue-gate node. The requirement to control each queue individually, however, adds some complexity to the design, and this is reflected in the control mechanism. Queues are created as needed and destroyed when empty, except for the Default Queue (see below). Each queue is identified by a Name string, which is used to steer messages to it, where they are stored or used to control the queue. This name is matched against the Queue Selector string provided by default in the msg.topic property. The user can, however, select any other message property for this purpose, thus retaining flexibility. Similarly, all the other message properties used to control the queues are defined by default but are available to be customized in the node edit dialog.

Usage

Queueing messages

An incoming message is added to the queue identified by the Queue Selector. If no such queue exists, one is created. If the Queue Selector is undefined or an empty string, the message is added to the Default Queue, which is always available. A message to be queued or a control message (see below) can sent to all queues by setting the Queue Selector to the value defined as All Queues.

Controlling message queues

Messages with the user-defined property Control Flag set to true (or a JavaScript equivalent) are not queued but are used to control the queues. These control messages can have payloads (case-insensitive strings) that the user has defined to represent commands for trigger, pause, resume,flush, peek, drop, reset, status, maximum, newest, protect, and delete. If a control message is received with a payload that is a number or boolean, the payload is converted to a string and then tested against the command definitions. If a control message is received but not recognized, there is no output or change of state, and the node issues a warning.

Each queue responds independently to control messages addressed to it by performing the following actions:

If a queue has been paused by the pause command, it will no longer queue incoming messages or accept most commands. Only commands that do not affect the contents of a queue will be executed while it is paused. These are indicated by an asterisk in the table above.

By default, messages arriving when a queue is full are discarded, so that the queue contains the oldest messages. The user can, however, set the Keep newest messages checkbox in order to have the Default Queue and all new queues created with the opposite behavior: new messages are added to the queue (at the tail), while discarding the oldest message (from the head), with the result that the queue contains the most recent messages. This behavior can be controlled on an individual queue basis by addressing a newest control message to that queue with the value property set to true or false. If the value is undefined, the Keep newest messages property will be set to its default. Changes to the Keep newest messages property do not affect messages already in the queue.

Memory Management

To prevent excessive memory usage, the node is configured by default to limit the size of message queues and to delete queues when they are empty. The Default Queue and all new queues will be created with the limit specified in the edit dialog, which can be changed by addressing a maximum command to one or all queues and providing the maximum value in the value property. There may be use cases (see the Retain Until Processed example below) where it is helpful to override these defaults. The queue size limit is eliminated by entering a Maximum Queue Size value of zero or less. A queue will not be deleted when empty if its Protect attribute is set to true. These values can be made the default for all queues in the edit dialog or changed for individual queues by using the maximum and protect commands. If the value property for either command is undefined, the default setting will be used for the queue. Changes to the maximum queue size do not affect messages already in the queue. Note that the Default Queue is created with its Protect attribute set to true, which cannot be modified. Hence, this queue can never be deleted.

Despite these precautions, it is possible to consume excessive amounts of memory if messages arrive with new (previously unseen) values of the Queue Selector, thus generating very many queues. The user is advised to monitor the number of active queues indicated by the node status text (see below) and to take appropriate action if it becomes unexpectedly large.

Node status

A limited indication of node activity is provided by the status text, which displays the number of queues in operation and the total number of messages queued. When a status command is received, the status text is refreshed so that it can be caught by a status node. In addition, if the Send status to second output checkbox is selected in the edit dialog, the node is deployed with a second output that sends a complete description of the selected queue(s) as an object of the form:

This is shown in the second flow in the *Node Status* example below.

Persistence

The state of the node (all queues and their contents) is maintained in the node context. If a persistent (non-volatile) form of context storage is available, the user has the option of restoring the state from that storage after a restart of Node-RED. This is done by activating the Restore from state saved in option (checkbox) in the edit dialog and choosing a non-volatile storage module from the adjacent dropdown list, which shows all the storage modules enabled in the Node-RED settings.js file.

Examples

Basic Operation

This flow demonstrates the basic operation of the m-queue node and the commands that can be used to change or display its state or manage the queue.

[{"id":"75a2f227.f318d4","type":"tab","label":"m-queue-demo","disabled":false,"info":""},{"id":"2adbd30f.24601c","type":"group","z":"75a2f227.f318d4","style":{"stroke":"#999999","stroke-opacity":"1","fill":"none","fill-opacity":"1","label":true,"label-position":"nw","color":"#a4a4a4"},"nodes":["26c05686.a92e92","7a7221f7.febf3","34529be4.051ac4","a057816c.0028a8","98b3dc41.f34378","cd879e3a.5c237"],"x":574,"y":359,"w":342,"h":242},{"id":"5f7860c0.3d0d88","type":"group","z":"75a2f227.f318d4","style":{"stroke":"#999999","stroke-opacity":"1","fill":"none","fill-opacity":"1","label":true,"label-position":"nw","color":"#a4a4a4"},"nodes":["7f658eda.a31058","8005b7c4.45eb38","1890649a.137773","25632026.e297c8","24c8146a.437ab4"],"x":54,"y":539,"w":432,"h":142},{"id":"d73c5c87.a36288","type":"group","z":"75a2f227.f318d4","style":{"stroke":"#999999","stroke-opacity":"1","fill":"none","fill-opacity":"1","label":true,"label-position":"nw","color":"#a4a4a4"},"nodes":["7331f684.0d42e8","9a8b71f3.b3b6a8","5c857d77.ef0034","a34c033a.3c84c","7ee4d0ac.0358a","29489dd3.db6d2a","54480c33.ce1c9c","6e0fa707.f6df7"],"x":514,"y":59,"w":442,"h":282},{"id":"f65a40d2.c66fd","type":"group","z":"75a2f227.f318d4","style":{"stroke":"#999999","stroke-opacity":"1","fill":"none","fill-opacity":"1","label":true,"label-position":"nw","color":"#a4a4a4"},"nodes":["2e81b27e.38c676","d4a8baa4.2ed418","e2d5eba0.e82968","62b8ebe6.e34254","2968d6c2.1c426a","524b3a66.c598fc","d475141f.f79e6","7bdcc2a.3d5bf3c","9413257a.f34068","52ea570d.d57e08","1d0498ae.977b07","8f51fc27.7461a","d90cbf00.23cf"],"x":34,"y":43,"w":462,"h":482},{"id":"2e81b27e.38c676","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"input","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"","payloadType":"date","x":130,"y":304,"wires":[["8f51fc27.7461a"]]},{"id":"7331f684.0d42e8","type":"inject","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"input","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue1","payload":"","payloadType":"date","x":610,"y":200,"wires":[["9a8b71f3.b3b6a8"]]},{"id":"d4a8baa4.2ed418","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"trigger","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"trigger","payloadType":"str","x":270,"y":124,"wires":[["8f51fc27.7461a"]],"outputLabels":["msg = {payload:\"trigger\",topic:\"default\",control:true }"],"info":"msg = {\n    payload:\"trigger\",\n    topic:\"default\",\n    control:true\n}"},{"id":"e2d5eba0.e82968","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"peek","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"peek","payloadType":"str","x":270,"y":284,"wires":[["8f51fc27.7461a"]]},{"id":"62b8ebe6.e34254","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"pause","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"pause","payloadType":"str","x":270,"y":164,"wires":[["8f51fc27.7461a"]],"outputLabels":["msg = { payload:\"pause\", topic:\"default\", control:true }"],"info":"msg = {\npayload:\"pause\",\ntopic:\"default\",\ncontrol:true\n}"},{"id":"2968d6c2.1c426a","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"resume","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"resume","payloadType":"str","x":270,"y":204,"wires":[["8f51fc27.7461a"]],"outputLabels":["payload:\"resume\" \\n topic:\"default\" \\n control:true"],"info":"msg = {\npayload:\"resume\",\ntopic:\"default\",\ncontrol:true\n}"},{"id":"524b3a66.c598fc","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"drop","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"drop","payloadType":"str","x":270,"y":324,"wires":[["8f51fc27.7461a"]]},{"id":"d475141f.f79e6","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"flush","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"flush","payloadType":"str","x":270,"y":244,"wires":[["8f51fc27.7461a"]]},{"id":"5c857d77.ef0034","type":"inject","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"flush","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue1","payload":"flush","payloadType":"str","x":770,"y":180,"wires":[["9a8b71f3.b3b6a8"]]},{"id":"7bdcc2a.3d5bf3c","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"reset","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"reset","payloadType":"str","x":270,"y":364,"wires":[["8f51fc27.7461a"]]},{"id":"a34c033a.3c84c","type":"inject","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"reset","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue1","payload":"reset","payloadType":"str","x":770,"y":260,"wires":[["9a8b71f3.b3b6a8"]]},{"id":"9413257a.f34068","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"maximum","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"},{"p":"value","v":"1","vt":"num"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"maximum","payloadType":"str","x":260,"y":404,"wires":[["8f51fc27.7461a"]]},{"id":"52ea570d.d57e08","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"status","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"status","payloadType":"str","x":270,"y":484,"wires":[["8f51fc27.7461a"]]},{"id":"7ee4d0ac.0358a","type":"inject","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"trigger","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue1","payload":"trigger","payloadType":"str","x":770,"y":140,"wires":[["9a8b71f3.b3b6a8"]]},{"id":"26c05686.a92e92","type":"inject","z":"75a2f227.f318d4","g":"2adbd30f.24601c","name":"input","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"all","payload":"","payloadType":"date","x":670,"y":480,"wires":[["7a7221f7.febf3"]]},{"id":"1d0498ae.977b07","type":"comment","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"default queue","info":"","x":250,"y":84,"wires":[]},{"id":"8f51fc27.7461a","type":"link out","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"","links":["7f658eda.a31058"],"x":455,"y":304,"wires":[]},{"id":"9a8b71f3.b3b6a8","type":"link out","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"","links":["7f658eda.a31058","10a5a17f.d199af"],"x":915,"y":200,"wires":[]},{"id":"6e0fa707.f6df7","type":"comment","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"queue #1","info":"","x":760,"y":100,"wires":[]},{"id":"7a7221f7.febf3","type":"link out","z":"75a2f227.f318d4","g":"2adbd30f.24601c","name":"","links":["7f658eda.a31058"],"x":875,"y":480,"wires":[]},{"id":"7f658eda.a31058","type":"link in","z":"75a2f227.f318d4","g":"5f7860c0.3d0d88","name":"","links":["41a22656.ad437","7a7221f7.febf3","8f51fc27.7461a","9a8b71f3.b3b6a8"],"x":95,"y":580,"wires":[["24c8146a.437ab4"]]},{"id":"34529be4.051ac4","type":"comment","z":"75a2f227.f318d4","g":"2adbd30f.24601c","name":"all queues","info":"","x":720,"y":400,"wires":[]},{"id":"d90cbf00.23cf","type":"inject","z":"75a2f227.f318d4","g":"f65a40d2.c66fd","name":"newest","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"},{"p":"value","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"newest","payloadType":"str","x":270,"y":444,"wires":[["8f51fc27.7461a"]]},{"id":"29489dd3.db6d2a","type":"inject","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"peek","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue1","payload":"peek","payloadType":"str","x":770,"y":220,"wires":[["9a8b71f3.b3b6a8"]]},{"id":"a057816c.0028a8","type":"inject","z":"75a2f227.f318d4","g":"2adbd30f.24601c","name":"trigger","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"all","payload":"trigger","payloadType":"str","x":730,"y":440,"wires":[["7a7221f7.febf3"]]},{"id":"98b3dc41.f34378","type":"inject","z":"75a2f227.f318d4","g":"2adbd30f.24601c","name":"reset","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"all","payload":"reset","payloadType":"str","x":730,"y":520,"wires":[["7a7221f7.febf3"]]},{"id":"8005b7c4.45eb38","type":"debug","z":"75a2f227.f318d4","g":"5f7860c0.3d0d88","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":370,"y":580,"wires":[]},{"id":"1890649a.137773","type":"status","z":"75a2f227.f318d4","g":"5f7860c0.3d0d88","name":"","scope":null,"x":200,"y":640,"wires":[["25632026.e297c8"]]},{"id":"25632026.e297c8","type":"debug","z":"75a2f227.f318d4","g":"5f7860c0.3d0d88","name":"","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"status.text","targetType":"msg","statusVal":"","statusType":"auto","x":360,"y":640,"wires":[]},{"id":"cd879e3a.5c237","type":"inject","z":"75a2f227.f318d4","g":"2adbd30f.24601c","name":"status","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"all","payload":"status","payloadType":"str","x":730,"y":560,"wires":[["7a7221f7.febf3"]]},{"id":"24c8146a.437ab4","type":"m-queue","z":"75a2f227.f318d4","g":"5f7860c0.3d0d88","name":"m-queue-demo","queueSelect":"topic","controlFlag":"control","defaultQueue":"default","allQueues":"all","triggerCmd":"trigger","statusCmd":"status","pauseCmd":"pause","resumeCmd":"resume","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","maximumCmd":"maximum","newestCmd":"newest","protectCmd":"protect","deleteCmd":"delete","paused":false,"protect":false,"keepNewestDefault":false,"maxSizeDefault":100,"protectDefault":false,"persist":false,"newValue":"value","storeName":"memoryOnly","statusOutput":false,"outputs":1,"x":220,"y":580,"wires":[["8005b7c4.45eb38"]]},{"id":"54480c33.ce1c9c","type":"inject","z":"75a2f227.f318d4","g":"d73c5c87.a36288","name":"protect","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"},{"p":"value","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue1","payload":"protect","payloadType":"str","x":770,"y":300,"wires":[["9a8b71f3.b3b6a8"]]}]

Node Status

This flow shows the default response to the status command and the optional use of a second output to obtain the complete status object for one or all queues.

[{"id":"a25c5984.bb3248","type":"tab","label":"m-queue-status","disabled":false,"info":""},{"id":"44b776c1.99b3b","type":"group","z":"a25c5984.bb3248","style":{"stroke":"#999999","stroke-opacity":"1","fill":"none","fill-opacity":"1","label":true,"label-position":"nw","color":"#a4a4a4"},"nodes":["fff3933.09139f","2098ab05.a81144","ae212584.32f4a8","e07b4047.96a12","8e739fe6.11563","9b2f745c.baab3","4917d5dc.337fbc"],"x":146,"y":39,"w":592,"h":182},{"id":"95d4e6cc.92ee68","type":"group","z":"a25c5984.bb3248","style":{"stroke":"#999999","stroke-opacity":"1","fill":"none","fill-opacity":"1","label":true,"label-position":"nw","color":"#a4a4a4"},"nodes":["3e4571de.1304ee","501b900a.a75008","b3cb0e71.5314a8","67f91541.df8e14","88fc842e.b843a","1c011b1d.17da95"],"x":166,"y":239,"w":552,"h":162},{"id":"fff3933.09139f","type":"inject","z":"a25c5984.bb3248","g":"44b776c1.99b3b","name":"status","props":[{"p":"payload"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"status","payloadType":"str","x":302,"y":160,"wires":[["4917d5dc.337fbc"]]},{"id":"2098ab05.a81144","type":"debug","z":"a25c5984.bb3248","g":"44b776c1.99b3b","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":602,"y":120,"wires":[]},{"id":"ae212584.32f4a8","type":"status","z":"a25c5984.bb3248","g":"44b776c1.99b3b","name":"","scope":["4917d5dc.337fbc"],"x":452,"y":180,"wires":[["e07b4047.96a12"]]},{"id":"e07b4047.96a12","type":"debug","z":"a25c5984.bb3248","g":"44b776c1.99b3b","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"status.text","targetType":"msg","statusVal":"","statusType":"auto","x":612,"y":180,"wires":[]},{"id":"88fc842e.b843a","type":"debug","z":"a25c5984.bb3248","g":"95d4e6cc.92ee68","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":622,"y":300,"wires":[]},{"id":"1c011b1d.17da95","type":"debug","z":"a25c5984.bb3248","g":"95d4e6cc.92ee68","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":622,"y":340,"wires":[]},{"id":"8e739fe6.11563","type":"inject","z":"a25c5984.bb3248","g":"44b776c1.99b3b","name":"input","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":242,"y":120,"wires":[["4917d5dc.337fbc"]]},{"id":"b3cb0e71.5314a8","type":"inject","z":"a25c5984.bb3248","g":"95d4e6cc.92ee68","name":"status","props":[{"p":"payload"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"status","payloadType":"str","x":322,"y":360,"wires":[["501b900a.a75008"]]},{"id":"67f91541.df8e14","type":"inject","z":"a25c5984.bb3248","g":"95d4e6cc.92ee68","name":"input","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":262,"y":320,"wires":[["501b900a.a75008"]]},{"id":"9b2f745c.baab3","type":"inject","z":"a25c5984.bb3248","g":"44b776c1.99b3b","name":"trigger","props":[{"p":"payload"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"trigger","payloadType":"str","x":302,"y":80,"wires":[["4917d5dc.337fbc"]]},{"id":"3e4571de.1304ee","type":"inject","z":"a25c5984.bb3248","g":"95d4e6cc.92ee68","name":"trigger","props":[{"p":"payload"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"trigger","payloadType":"str","x":322,"y":280,"wires":[["501b900a.a75008"]]},{"id":"501b900a.a75008","type":"m-queue","z":"a25c5984.bb3248","g":"95d4e6cc.92ee68","name":"queue #2","queueSelect":"topic","controlFlag":"control","defaultQueue":"default","allQueues":"all","triggerCmd":"trigger","statusCmd":"status","pauseCmd":"pause","resumeCmd":"resume","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","maximumCmd":"maximum","newestCmd":"newest","protectCmd":"protect","deleteCmd":"delete","paused":false,"protect":false,"keepNewestDefault":false,"maxSizeDefault":100,"protectDefault":false,"persist":false,"newValue":"value","storeName":"memoryOnly","statusOutput":true,"outputs":2,"x":472,"y":320,"wires":[["88fc842e.b843a"],["1c011b1d.17da95"]]},{"id":"4917d5dc.337fbc","type":"m-queue","z":"a25c5984.bb3248","g":"44b776c1.99b3b","name":"queue #1","queueSelect":"topic","controlFlag":"control","defaultQueue":"default","allQueues":"all","triggerCmd":"trigger","statusCmd":"status","pauseCmd":"pause","resumeCmd":"resume","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","maximumCmd":"maximum","newestCmd":"newest","protectCmd":"protect","deleteCmd":"delete","paused":false,"protect":false,"keepNewestDefault":false,"maxSizeDefault":100,"protectDefault":false,"persist":false,"newValue":"value","storeName":"memoryOnly","statusOutput":false,"outputs":1,"x":452,"y":120,"wires":[["2098ab05.a81144"]]}]

Retain Until Processed

This flow uses the peek and drop commands in order to: release the oldest message without deleting it from the queue, process it (here simply waiting 5 seconds for demonstration purposes), then remove it from the head of the queue and release the next message. Processing stops once the queue is empty. It makes use of the Protect Queue feature to prevent empty queues from being deleted and producing warning messages.

[{"id":"6da8b2bf.e9aefc","type":"tab","label":"m-queue-retain","disabled":false,"info":""},{"id":"9d65188.ddc48e8","type":"inject","z":"6da8b2bf.e9aefc","name":"inject default","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"","payloadType":"date","x":190,"y":120,"wires":[["4706f6af.0460c8"]]},{"id":"a127c619.20b348","type":"delay","z":"6da8b2bf.e9aefc","name":"process","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":540,"y":180,"wires":[["77fe24e0.10287c","89fed29e.7fe85"]]},{"id":"1d3379cf.e4c5e6","type":"inject","z":"6da8b2bf.e9aefc","name":"process default","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"default","payload":"peek","payloadType":"str","x":180,"y":160,"wires":[["4706f6af.0460c8"]]},{"id":"6f059504.1bff7c","type":"link in","z":"6da8b2bf.e9aefc","name":"","links":["2362977b.f1fe08"],"x":235,"y":60,"wires":[["4706f6af.0460c8"]]},{"id":"77fe24e0.10287c","type":"debug","z":"6da8b2bf.e9aefc","name":"output","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","x":690,"y":220,"wires":[]},{"id":"89fed29e.7fe85","type":"function","z":"6da8b2bf.e9aefc","name":"get next","func":"let queue = msg.topic\nnode.send({topic: queue,control: true, payload: \"drop\"})\nnode.send({topic: queue,control: true, payload: \"peek\"})","outputs":1,"noerr":0,"initialize":"","finalize":"","x":700,"y":180,"wires":[["2362977b.f1fe08"]]},{"id":"2362977b.f1fe08","type":"link out","z":"6da8b2bf.e9aefc","name":"","links":["6f059504.1bff7c"],"x":815,"y":140,"wires":[]},{"id":"3b6a4ccd.a631f4","type":"inject","z":"6da8b2bf.e9aefc","name":"inject queue #1","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue #1","payload":"","payloadType":"date","x":180,"y":220,"wires":[["4706f6af.0460c8"]]},{"id":"f975a83b.70d32","type":"inject","z":"6da8b2bf.e9aefc","name":"process queue #1","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"queue #1","payload":"peek","payloadType":"str","x":170,"y":260,"wires":[["4706f6af.0460c8"]]},{"id":"4706f6af.0460c8","type":"m-queue","z":"6da8b2bf.e9aefc","name":"","queueSelect":"topic","controlFlag":"control","defaultQueue":"default","allQueues":"all","triggerCmd":"trigger","statusCmd":"status","pauseCmd":"pause","resumeCmd":"resume","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","maximumCmd":"maximum","newestCmd":"newest","protectCmd":"protect","deleteCmd":"delete","paused":false,"protect":false,"keepNewestDefault":false,"maxSizeDefault":100,"protectDefault":true,"persist":false,"newValue":"value","storeName":"memoryOnly","statusOutput":false,"outputs":1,"x":400,"y":180,"wires":[["a127c619.20b348"]]}]

Author

Mike Bell (drmike)

Node Info

Version: 0.2.1
Updated 4 years, 1 month ago
License: Apache-2.0
Rating: not yet rated

Categories

Actions

Rate:

Downloads

23 in the last week

Nodes

  • m-queue

Keywords

  • node-red
  • queue
  • flow control

Maintainers