Command Design Pattern

This is a node-red specific implementation of the command design pattern that helps keeping editor flows transparent.

Advantages:

  • each UI input (or any source of input) communicates only with the dispatcher
  • each UI input (or any source of input) has a dedicated subflow within which its corresponding logic is implemented
  • each dedicated subflow is assigned to a common catch block that handles the unhandled errors in the subflow (and that of the embedded subflows)
  • sequential input handling for normal priority messages to make processing synchronous
  • high priority message handling to keep asynchronous processing possible as well
  • the dispatcher builds a queue of the normal priority incoming inputs (this is the current implementation at least)
  • debugging gets easier as all messages pass through the dispatcher (switches can be easily added before the debug nodes to set a filter on the cmdObj properties to narrow down the number of debug messages)

How it works:

The message sent to the dispatcher must have a cmdObj json object property like the one in the cmdSend node (see example flow below):

msg.cmdObj={
priority: 1,
linkIn: "numeric input",
command: "calculate gauge data",
linkOut: ["gauge", "numeric input"]
}

priority: 1 is normal and 0 is high (actually anything that is not 0 is interpreted as normal). High prio messages get through the dispatcher without being put in the queue.

linkIn: identifies the sender

command: the command identifier that identifies the receiver as well. This is passed to the "link call" node as msg.target which means that it must match the name or id of the receiver link in node.

linkOut: an array in which each element (passed as msg. target to the "link call" node) identifies the link in nodes to which a result shall be sent. Subflows must return. (If a subflow does not reutrn, the dispatcher cannot remove the message being processed from the queue and gets blocked.) The payload returned by a subflow must be an array in which each element is considered as a payload that needs to be distributed to the corresponding linkOut target. The linkOut array elements and the elements of the output payload array of the command subflow are matched by their index. If a certain property needs to be set besides the payload in a returning message, it shall be done in a json object using the 'properties' and 'payload' tags as property names. Example for multiple linkOut targets and returning messages taken from the 'calculate gauge data' subflow:

cmdObj.linkOut array:

linkOut: ["gauge", "numeric input"]

subflow returns:

msg.payload=[msg.payload*2, {properties:{enabled:true},payload:msg.payload}];

Cases when no linkOut is needed can be handled this way:

linkOut: []

subflow returns:

msg.payload=[];

Cases when no message is sent to a certain linkOut target can be handled by sending null:

linkOut: ["target1","target2"]

subflow returns:

msg.payload=["payload2target1",null];

Dispatcher process:

  • the message generated by an input on the ui needs to be passed to the dispatcher with the cmdObj property
  • the dispatcher either puts it in the queue if there's another message already being processed or releases it immediately
  • when a message is released it is sent to the "link call" node which directs it to the corresponding subflow based on its cmdObj.command property (which is copied to msg.target)
  • the subflow gets executed
  • when the subflow returns its message it is directed to the dispatcher
  • the dispatcher identifies the returning message, copies the msg payloads according to the linkOut property to separate messages and sends them to the linkOut targets via the "link call" node
  • if there was an error in the subflow and was indicated in the returned message cmdObj.error property, it is copied over to each message cmdObj.error during copying
  • the dispatcher deletes the message from the queue, takes the next one and sends it to the "link call" node
  • in case of an unhandled exception in any of the subflows, the message queue is reset in order that the dispatcher does not get blocked due to waiting for the returning message. Another implementation could be that a special message (just like the reset) is sent to the dispatcher to remove the last message which could be easily get by the next() method as the js map implementation preserves the order of insertion.

Best practices:

  • Either put such a dispatcher logic on each editor tab that corresponds to a ui tab to handle tab specific events or just use it globally.
  • On each editor tab use only one ui_control node from which you expect any incoming events and handle that with a command object through the dispatcher.
[{"id":"81eecec3.f857d","type":"subflow","name":"disable numeric input","info":"","category":"","in":[{"x":40,"y":40,"wires":[{"id":"30ae6a1d.ef635e"}]}],"out":[{"x":260,"y":40,"wires":[{"id":"30ae6a1d.ef635e","port":0}]}],"env":[],"color":"#DDAA99"},{"id":"30ae6a1d.ef635e","type":"function","z":"81eecec3.f857d","name":"disable","func":"msg.payload=[{properties:{enabled:false},payload:msg.payload}];\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","x":150,"y":40,"wires":[[]]},{"id":"c8e83223.f69aa8","type":"subflow","name":"calculate gauge data","info":"","category":"","in":[{"x":60,"y":100,"wires":[{"id":"9b3ebf5.9c0c0c"}]}],"out":[{"x":420,"y":100,"wires":[{"id":"89f14f30.ebc718","port":0}]}],"env":[],"color":"#DDAA99"},{"id":"89f14f30.ebc718","type":"function","z":"c8e83223.f69aa8","name":"calculate","func":"msg.payload=[msg.payload*2, {properties:{enabled:true},payload:msg.payload}];\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","x":320,"y":100,"wires":[[]]},{"id":"9b3ebf5.9c0c0c","type":"delay","z":"c8e83223.f69aa8","name":"","pauseType":"delay","timeout":"1","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"outputs":1,"x":180,"y":100,"wires":[["89f14f30.ebc718"]]},{"id":"e4078580609f246c","type":"tab","label":"Flow 1","disabled":false,"info":""},{"id":"da60e6734a1c8501","type":"function","z":"e4078580609f246c","name":"dispatcher","func":"if(msg.payload===\"reset\"){\n    let msgQueue=new Map();\n    context.set(\"msgQueue\",msgQueue);\n}\n\nlet msgQueue=context.get(\"msgQueue\");\nif(msgQueue===undefined) msgQueue=new Map();\nif(typeof msg.cmdObj!==\"undefined\"){\n    //node.warn(\"cmdObj\");\n    if(typeof msg.cmdObj.timestamp!==\"undefined\"){\n        //node.warn(\"old:\"+msg.cmdObj.timestamp);\n        //node.warn(\"old size:\"+msgQueue.size);\n        if(msg.cmdObj.priority===0){\n            delete msg.cmdObj.timestamp;\n            for(let i=0;i<msg.cmdObj.linkOut.length;++i){\n                //high prio msgs are not queued, so the returned msg must preserve the cmdObj as it was\n                let returnAsyncMsg=RED.util.cloneMessage(msg);\n                returnAsyncMsg.cmdObj.linkOut=msg.cmdObj.linkOut[i];\n                if(msg.payload[i]===null) returnAsyncMsg=null;\n                else{\n                    if(typeof msg.payload[i].properties!==\"undefined\"){\n                        for(let j in msg.payload[i].properties){\n                            returnAsyncMsg[j]=msg.payload[i].properties[j];\n                        }\n                        returnAsyncMsg.payload=msg.payload[i].payload;\n                    }\n                    else{\n                        returnAsyncMsg.payload=msg.payload[i];\n                    }\n                    if(typeof msg.cmdObj.error!==\"undefined\") returnAsyncMsg.cmdObj.error=msg.cmdObj.error;\n                    returnAsyncMsg.target = msg.cmdObj.linkOut[i];\n                }\n                //node.warn(returnAsyncMsg);\n                node.send(returnAsyncMsg);\n            }\n            return null;\n        }\n        else{\n            let keys=msgQueue.keys();\n            let returnKey=keys.next();\n            if(returnKey.done===false&&returnKey.value===msg.cmdObj.timestamp){\n                delete msg.cmdObj.timestamp;\n                let values=msgQueue.values();\n                let returnMsg=values.next().value;\n                //node.warn(\"returnKey:\"+returnKey.value+\", command:\"+msg.cmdObj.command);\n                 for(let i=0;i<returnMsg.cmdObj.linkOut.length;++i){\n                    let returnAsyncMsg=RED.util.cloneMessage(msg);\n                    returnAsyncMsg.cmdObj.linkOut=returnMsg.cmdObj.linkOut[i];\n                    if(msg.payload[i]===null) returnAsyncMsg=null;\n                    else{\n                        if(typeof msg.payload[i].properties!==\"undefined\"){\n                            for(let j in msg.payload[i].properties){\n                                returnAsyncMsg[j]=msg.payload[i].properties[j];\n                            }\n                            returnAsyncMsg.payload=msg.payload[i].payload;\n                        }\n                        else{\n                            returnAsyncMsg.payload=msg.payload[i];\n                        }\n                        if(typeof msg.cmdObj.error!==\"undefined\") returnAsyncMsg.cmdObj.error=msg.cmdObj.error;\n                        returnAsyncMsg.target = msg.cmdObj.linkOut[i];\n                    }\n                    //node.warn(returnAsyncMsg);\n                    node.send(returnAsyncMsg);\n                }\n                msgQueue.delete(returnKey.value);\n                context.set(\"msgQueue\",msgQueue);\n                //node.warn(\"re-size:\"+msgQueue.size);\n                let nextMsg=null;\n                let nextKey=keys.next();\n                if(nextKey.done===false){\n                    nextMsg=msgQueue.values().next().value;\n                }\n                //node.warn(nextMsg);\n                return nextMsg;\n            }\n            else return null;\n        }\n    }\n    else if(typeof msg.cmdObj.priority!==\"undefined\" &&\n            typeof msg.cmdObj.command!==\"undefined\" &&\n            typeof msg.cmdObj.linkIn!==\"undefined\" &&\n            typeof msg.cmdObj.linkOut!==\"undefined\"){\n            msg.target = msg.cmdObj.command;\n            if(msg.cmdObj.priority===0){\n                let timestamp=Date.now().toString()+Math.random();\n                msg.cmdObj.timestamp=timestamp;\n                return msg;\n            }\n            else{\n                if(msgQueue.size===0){\n                    let timestamp=Date.now().toString()+Math.random();\n                    msg.cmdObj.timestamp=timestamp;\n                    //node.warn(\"released new:\"+timestamp+\", command:\"+msg.cmdObj.command);\n                    msgQueue.set(timestamp,RED.util.cloneMessage(msg));\n                    //node.warn(\"new size:\"+msgQueue.size);\n                    context.set(\"msgQueue\",msgQueue);\n                    return msg;\n                }\n                else{\n                    let timestamp=Date.now().toString()+Math.random();\n                    msg.cmdObj.timestamp=timestamp;\n                    //node.warn(\"new:\"+timestamp+\", command:\"+msg.cmdObj.command);\n                    msgQueue.set(timestamp,RED.util.cloneMessage(msg));\n                    //node.warn(\"new size:\"+msgQueue.size);\n                    context.set(\"msgQueue\",msgQueue);\n                    return null;\n                }\n            }\n    }\n}\nelse{\n    //node.warn(\"no cmdObj\");\n    return null;\n}","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":600,"y":200,"wires":[["fe17a898116a2988","78cd2c85bdc093dc"]],"outputLabels":["nextMsg"]},{"id":"c54ab33c23bd95c4","type":"link in","z":"e4078580609f246c","name":"dispatcher","links":["a58a6f01fb97364c","f9996d82232cb887"],"x":465,"y":200,"wires":[["da60e6734a1c8501","fe866f6b90345878"]]},{"id":"a0872d8d8e4cb0a8","type":"inject","z":"e4078580609f246c","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":true,"onceDelay":"0","topic":"","payload":"reset","payloadType":"str","x":330,"y":300,"wires":[["7627cb006ae3a313"]]},{"id":"f9242b4fb52719da","type":"catch","z":"e4078580609f246c","name":"Catch dispatcher subflow errors","scope":["5794ea4ad4d29da6","7bc5c66b69da52e4"],"uncaught":false,"x":250,"y":260,"wires":[["7627cb006ae3a313"]]},{"id":"7627cb006ae3a313","type":"function","z":"e4078580609f246c","name":"reset","func":"msg.payload=\"reset\";\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":450,"y":260,"wires":[["da60e6734a1c8501"]]},{"id":"fe17a898116a2988","type":"debug","z":"e4078580609f246c","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":750,"y":140,"wires":[]},{"id":"900d964fed69f4bf","type":"function","z":"e4078580609f246c","name":"cmdSend","func":"let uiMsg={};\nuiMsg.cmdObj={\n    priority: 0,\n    linkIn: \"numeric input\",\n    command: \"disable numeric input\",\n    linkOut: [\"numeric input\"]\n}\n\nmsg.cmdObj={\n    priority: 1,\n    linkIn: \"numeric input\",\n    command: \"calculate gauge data\",\n    linkOut: [\"gauge\", \"numeric input\"]\n}\nreturn [uiMsg, msg];\n","outputs":2,"noerr":0,"initialize":"","finalize":"","libs":[],"x":310,"y":100,"wires":[["a58a6f01fb97364c"],["f9996d82232cb887"]]},{"id":"f9996d82232cb887","type":"link out","z":"e4078580609f246c","name":"","links":["c54ab33c23bd95c4"],"x":425,"y":120,"wires":[]},{"id":"7c0d06ef8410105a","type":"ui_gauge","z":"e4078580609f246c","name":"","group":"7bae444b.8c2554","order":1,"width":0,"height":0,"gtype":"gage","title":"gauge","label":"units","format":"{{value}}","min":0,"max":"100","colors":["#00b500","#e6e600","#ca3838"],"seg1":"","seg2":"","x":1010,"y":100,"wires":[]},{"id":"5794ea4ad4d29da6","type":"subflow:c8e83223.f69aa8","z":"e4078580609f246c","name":"","env":[],"x":680,"y":320,"wires":[["655c3be9e0abf9ee"]]},{"id":"58f7134d611fb652","type":"link in","z":"e4078580609f246c","name":"calculate gauge data","links":["e9e3252c18260cf1"],"x":525,"y":320,"wires":[["5794ea4ad4d29da6"]]},{"id":"655c3be9e0abf9ee","type":"link out","z":"e4078580609f246c","name":"calculate gaugeData out","mode":"return","links":[],"x":825,"y":320,"wires":[]},{"id":"478db4380a2c3403","type":"link in","z":"e4078580609f246c","name":"gauge","links":["625a584021e7b4ed"],"x":915,"y":100,"wires":[["7c0d06ef8410105a"]]},{"id":"897d5d215b648d9a","type":"link in","z":"e4078580609f246c","name":"numeric input","links":["bdbf3d2fc48a5c6b","e52f589404c265ea"],"x":65,"y":100,"wires":[["40c305624725e662"]]},{"id":"40c305624725e662","type":"ui_numeric","z":"e4078580609f246c","name":"","label":"numeric","tooltip":"","group":"7bae444b.8c2554","order":1,"width":0,"height":0,"wrap":false,"passthru":false,"topic":"","format":"","min":0,"max":"100","step":1,"x":170,"y":100,"wires":[["900d964fed69f4bf"]]},{"id":"fe866f6b90345878","type":"debug","z":"e4078580609f246c","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":590,"y":140,"wires":[]},{"id":"a58a6f01fb97364c","type":"link out","z":"e4078580609f246c","name":"","links":["c54ab33c23bd95c4"],"x":425,"y":80,"wires":[]},{"id":"7bc5c66b69da52e4","type":"subflow:81eecec3.f857d","z":"e4078580609f246c","name":"","env":[],"x":680,"y":360,"wires":[["b776acaddad24ca0"]]},{"id":"b1b8a612937f7083","type":"link in","z":"e4078580609f246c","name":"disable numeric input","links":["6af457a58459a10c"],"x":525,"y":360,"wires":[["7bc5c66b69da52e4"]]},{"id":"b776acaddad24ca0","type":"link out","z":"e4078580609f246c","name":"disable numeric input out","mode":"return","links":[],"x":825,"y":360,"wires":[]},{"id":"78cd2c85bdc093dc","type":"link call","z":"e4078580609f246c","name":"link call","links":[],"linkType":"dynamic","timeout":"30","x":760,"y":200,"wires":[["da60e6734a1c8501"]]},{"id":"7bae444b.8c2554","type":"ui_group","name":"Default","tab":"5e1e8043.811fc8","order":1,"disp":true,"width":"6","collapse":false},{"id":"5e1e8043.811fc8","type":"ui_tab","name":"Home","icon":"dashboard","disabled":false,"hidden":false}]

Flow Info

Created 3 years, 6 months ago
Updated 1 year, 5 months ago
Rating: not yet rated

Owner

Actions

Rate:

Node Types

Core
  • catch (x1)
  • debug (x2)
  • delay (x1)
  • function (x5)
  • inject (x1)
  • link call (x1)
  • link in (x5)
  • link out (x4)
Other
  • subflow (x2)
  • subflow:81eecec3.f857d (x1)
  • subflow:c8e83223.f69aa8 (x1)
  • tab (x1)
  • ui_gauge (x1)
  • ui_group (x1)
  • ui_numeric (x1)
  • ui_tab (x1)

Tags

  • command
  • design
  • pattern
  • principle
Copy this flow JSON to your clipboard and then import into Node-RED using the Import From > Clipboard (Ctrl-I) menu option