Split Join Parallelized Execution Pattern

This is a template subflow for parallelizing the execution of a function or process on the provided input data (Single Program Multiple Data pattern). The subflow gets the initial message and chunks it down based on the msg.payload.value.splitsize value. Each batch of input rows is forwared as input to one of three means of execution (configured in the msg.payload.value.execution as one of "faas","local_multithread","local_multiprocess"):

  • an Openwhisk action (whose name is included in the msg.payload.value.action field)
  • a local thread or
  • a newly spawned local process (name of script to execute in msg.payload.value.shellscript)

Pattern Images-Split and Join drawio (1)

Therefore if an array of 1000 rows (included in the msg.payload.value.values) is inserted and the splitsize is set to 10, it will create 100 inner calls of the msg.payload.value.action if the execution is set to "faas". In this case the Openwhisk action node needs also to be configured with the credentials and location of the available Openwhisk instance.

The solution of the msg.payload.value.splitsize was chosen since node properties of the Split node allow only environment variables to be used for setting the split size. These can be configured only at startup and thus can not be used for dynamic management of the pattern during runtime. Furthermore, with this dynamic setting we can also pass the splitsize as an argument when we incorporate this subflow around an HTTP endpoint and the splitsize can be an input argument.

However it needs to be noted that now the functions used in the inner parallelized action need to be able to process arrays as inputs.

The flow includes also a rate limiter, in order to be aligned with any Openwhisk options regarding maximum invocations per minute. By setting the rate limiter to the according OW limit, we can avoid failures of action invocations due to this limit. The parameter can be set in the millisecond interarrival time (msg.paylod.value.maxOWmillisecinterarrival).

For the multiprocess execution mode (local_multiprocess option of execution), each splitted message results in a separate process of that script. The script needs to be adapted to the way the multiple values are passed. A randomized local file is used for storing the values for each execution, while the format of that storage appears in the debug window.

The data format of the msg.payload is specifically designed so that it is compatible with the way Openwhisk passes arguments so that this flow can be directly combined with a relevant wrapper and executed as a function.

The configuration parameters can also be set through the node menu, however the values in the incoming message will override any set values during node initialization.

This flow has been created in the context of the H2020 PHYSICS Project (GA 101017047)

[{"id":"dc80956e15e9e07b","type":"subflow","name":"SplitJoinMultiple","info":"In contrast to the simple SJ pattern that breaks it into single messages, this pattern gets the initial message and chunks it down based on the msg.payload.value.splitsize value. \n\nTherefore if an array of 1000 rows (included in the msg.payload.value.values) is inserted and the splitsize is set to 10, it will create 100 inner calls of the msg.payload.value.action.\n\nThe solution of the msg.payload.value.splitsize was chosen since node properties of the Split node allow only environment variables to be used for setting the split size. These can be configured only at startup and thus can not be used for dynamic management of the pattern during runtime. Furthermore, with this dynamic setting we can also pass the splitsize as an argument when we incorporate this subflow around an HTTP endpoint and the splitsize can be an input argument.\n\nHowever it needs to be noted that now the functions used in the inner parallelized action need to be able to process arrays as inputs.\n\nThe flow includes also a rate limiter, in order to be aligned with any Openwhisk options regarding maximum invocations per minute. By setting the rate limiter to the according OW limit, we can avoid failures of action invocations due to this limit. The parameter can be set in the millisecond interarrival time (msg.paylod.value.maxOWmillisecinterarrival).\n\nFurthermore, the flow supports a local multithreaded execution, if the respective function needs to be executed within the top level function. In this case, the incoming value of msg.payload.value.execution needs to be set to \"local_multithread\". If it is set to \"faas\" then the Openwhisk Action that is contained in the msg.payload.value.action is used. \n\nThe flow also supports a local multiprocess execution mode (local_multiprocess option), in which a shell script is provided for this purpose. Each splitted message results in a separate process of that script. The script needs to be adapted to the way the multiple values are passed. A randomized local file is used for storing the values for each execution, while the format of that storage appears in the debug window.\n\nThe data format of the msg.payload is specifically designed so that it is compatible with the way Openwhisk passes arguments so that this flow can be directly combined with the OW Skeleton node.\n\nThe configuration parameters can also be set through the node menu, however the values in the incoming message will override any set values during node initialization.","category":"PHYSICS","in":[{"x":200,"y":300,"wires":[{"id":"e53456c2139266e4"}]}],"out":[{"x":1000,"y":460,"wires":[{"id":"d9d299b0fe0812eb","port":0}]}],"env":[{"name":"splitsize","type":"num","value":"1"},{"name":"execution","type":"str","value":"faas","ui":{"label":{"en-US":"execution"},"type":"select","opts":{"opts":[{"l":{"en-US":"faas"},"v":"faas"},{"l":{"en-US":"local_multithread"},"v":"local_multithread"},{"l":{"en-US":"local_multiprocess"},"v":"local_multiprocess"}]}}},{"name":"maxOWmillisecinterarrival","type":"num","value":"20"},{"name":"innerActionName","type":"str","value":"dockeraction"},{"name":"Shell script","type":"str","value":"/path/to/script"}],"meta":{},"color":"#FFAAAA"},{"id":"906d6a37be3b38fb","type":"split","z":"dc80956e15e9e07b","name":"","splt":"\\n","spltType":"str","arraySplt":"1","arraySpltType":"len","stream":false,"addname":"","x":710,"y":300,"wires":[["3b41725545838f47"]]},{"id":"04683ea925ed9ba4","type":"openwhisk-action","z":"dc80956e15e9e07b","name":"","func":"","namespace":"guest","action":"","params":[],"service":"10b4be71.b76a6a","edit":true,"x":490,"y":440,"wires":[["656e24eb4ac6801c"]]},{"id":"656e24eb4ac6801c","type":"join","z":"dc80956e15e9e07b","name":"","mode":"auto","build":"string","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":"false","timeout":"","count":"","reduceRight":false,"x":790,"y":520,"wires":[["d9d299b0fe0812eb"]]},{"id":"dbd5aba971e09814","type":"comment","z":"dc80956e15e9e07b","name":"NEEDS CONFIGURATION WITH OW ENDPOINT","info":"","x":570,"y":400,"wires":[]},{"id":"e53456c2139266e4","type":"function","z":"dc80956e15e9e07b","name":"adapt inputs","func":"//keep input settings\nif (msg.payload.value.hasOwnProperty('action')){\n    msg.action=msg.payload.value.action;\n} else {\n    msg.action=env.get('innerActionName');\n}\n\nif (msg.payload.value.hasOwnProperty('shellscript')){\n    msg.shellscript=msg.payload.value.shellscript;\n} else {\n    msg.shellscript=env.get('Shell script');\n}\n\nif (msg.payload.value.hasOwnProperty('execution')){\n    msg.execution=msg.payload.value.execution;\n} else {\n    msg.execution=env.get('execution');\n}\n\nif (msg.payload.value.hasOwnProperty('splitsize')){\n    msg.splitsize=msg.payload.value.splitsize;\n} else {\n    msg.splitsize=env.get('splitsize');\n}\n\nif (msg.payload.value.hasOwnProperty('maxOWmillisecinterarrival')){\n    msg.rate=msg.payload.value.maxOWactionspersecond;\n} else {\n    msg.rate=env.get('maxOWmillisecinterarrival');\n}\n\n//can have also msg.rate based on OW limits for actions per minute\n\n\n//move actual values to msg.payload\nmsg.oldpayload=msg.payload;\nmsg.payload={};\nmsg.payload=msg.oldpayload.value.values;\n\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":330,"y":300,"wires":[["122ff74424914e70"]]},{"id":"d9d299b0fe0812eb","type":"function","z":"dc80956e15e9e07b","name":"adapt output","func":"msg.newpayload=msg.payload;\nmsg.payload=[];\nmsg.payload=msg.oldpayload;\nmsg.payload.value=msg.newpayload;\n\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":930,"y":520,"wires":[[]]},{"id":"122ff74424914e70","type":"function","z":"dc80956e15e9e07b","name":"array chunks","func":"var splitsize=1;\n\nif (msg.hasOwnProperty('splitsize')){\n    splitsize=msg.splitsize;\n}else{\n    splitsize=1;\n    }\n\nvar arrays = [];\n//console.log('\\nbefore splicing msg payload with splitsize:'+splitsize);    \nwhile (msg.payload.length > 0){ \n    //console.log('\\nsplicing');\n    arrays.push(msg.payload.splice(0, splitsize));  \n}\n  \n  \n\nmsg.payload=arrays;\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":510,"y":300,"wires":[["906d6a37be3b38fb"]]},{"id":"3b41725545838f47","type":"function","z":"dc80956e15e9e07b","name":"array to object","func":"\n//the Openwhisk specification needs objects as input arguments\nvar input=msg.payload;\n\nmsg.payload={};\nmsg.payload={input};\nreturn msg;\n\n","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":870,"y":300,"wires":[["6c26a112b5b70fa5"]]},{"id":"6c26a112b5b70fa5","type":"delay","z":"dc80956e15e9e07b","name":"","pauseType":"rate","timeout":"5","timeoutUnits":"seconds","rate":"30","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":true,"x":240,"y":420,"wires":[["fc22d5c0afab1183"]]},{"id":"c2cc0e46bde1098c","type":"mp-function","z":"dc80956e15e9e07b","name":"threads-ADD MAX THREADS","rqs":"","func":"\n\n\nreturn msg;","persistent":false,"outputs":1,"threads":"30","noerr":0,"x":510,"y":520,"wires":[["656e24eb4ac6801c"]]},{"id":"fc22d5c0afab1183","type":"switch","z":"dc80956e15e9e07b","name":"","property":"execution","propertyType":"msg","rules":[{"t":"eq","v":"faas","vt":"str"},{"t":"eq","v":"local_multithread","vt":"str"},{"t":"eq","v":"local_multiprocess","vt":"str"}],"checkall":"true","repair":false,"outputs":3,"x":290,"y":520,"wires":[["04683ea925ed9ba4"],["c2cc0e46bde1098c"],["dc4eac2d6c92cac1","d61d4275bbe39678"]]},{"id":"a7d020490ba20441","type":"comment","z":"dc80956e15e9e07b","name":"INPUT VALUES THROUGH FILE","info":"","x":190,"y":660,"wires":[]},{"id":"37b86b9497b37417","type":"function","z":"dc80956e15e9e07b","name":"randomize filename","func":"msg.filename='file_'+Math.random();\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":330,"y":620,"wires":[["76201d0e6519042b"]]},{"id":"76201d0e6519042b","type":"file","z":"dc80956e15e9e07b","name":"","filename":"","appendNewline":false,"createDir":false,"overwriteFile":"true","encoding":"none","x":490,"y":620,"wires":[["a19bc71cfb355df8"]]},{"id":"a19bc71cfb355df8","type":"function","z":"dc80956e15e9e07b","name":"Prepare name&inputs","func":"\n//currently the values are in the msg.payload however the exec node\n//needs the command in the msg.payload.\n//Therefore we need to adapt msg fields and then check how the input\n//values are passed to the script, which is case specific\n//msg.values=msg.payload;\n//msg.payload=msg.shellscript;\n\n//how to pass inputs to the script depends on \n//how the script accepts them.\n//if inputs are passed through the cli command, e.g. /script input\n//then the relevant command needs to be built in this node, e.g. if\n//the msg.payload is a string:\n//msg.payload=msg.shellscript+' '+msg.payload;\n\n//if it is an array, it needs to be transformed to a string of values\n//by using the csv node before this node before using the above command\n\n\n//if it is too large, it can be stored in a file prior to this node\n//through the usage of the file node of Node-RED\n//and we can pass the filename argument as input in the script\n//In that case the script should read the input from the file\nmsg.payload=msg.shellscript+' '+msg.filename;\n\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":440,"y":680,"wires":[["ae551f31809f4569"]]},{"id":"ae551f31809f4569","type":"exec","z":"dc80956e15e9e07b","command":"","addpay":"payload","append":"","useSpawn":"false","timer":"","winHide":false,"oldrc":false,"name":"","x":610,"y":680,"wires":[["656e24eb4ac6801c","a05211b1c6774ea3"],[],[]]},{"id":"a05211b1c6774ea3","type":"file","z":"dc80956e15e9e07b","name":"","filename":"","appendNewline":true,"createDir":false,"overwriteFile":"delete","encoding":"none","x":770,"y":660,"wires":[[]]},{"id":"dc4eac2d6c92cac1","type":"debug","z":"dc80956e15e9e07b","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":470,"y":560,"wires":[]},{"id":"d61d4275bbe39678","type":"json","z":"dc80956e15e9e07b","name":"","property":"payload","action":"","pretty":false,"x":150,"y":620,"wires":[["37b86b9497b37417"]]},{"id":"10b4be71.b76a6a","type":"openwhisk-service","name":"MYOPENWHISK","api":"http://10.100.59.182:3233/api/v1"},{"id":"c41c1d32d69a350e","type":"subflow:dc80956e15e9e07b","z":"c640147570033a12","name":"","x":670,"y":400,"wires":[[]]}]

Flow Info

Created 2 years, 3 months ago
Updated 2 years ago
Rating: not yet rated

Owner

Actions

Rate:

Node Types

Core
  • comment (x2)
  • debug (x1)
  • delay (x1)
  • exec (x1)
  • file (x2)
  • function (x6)
  • join (x1)
  • json (x1)
  • split (x1)
  • switch (x1)
Other
  • mp-function (x1)
  • openwhisk-action (x1)
  • openwhisk-service (x1)
  • subflow (x1)
  • subflow:dc80956e15e9e07b (x1)

Tags

  • spmd
  • faas
  • parallelization
Copy this flow JSON to your clipboard and then import into Node-RED using the Import From > Clipboard (Ctrl-I) menu option