Push SolarLog Readings to Influxdb via MQTT

This flow receives a json string from SolarLog PV Monitoring units containing all available measurements. These measurements are then transformed in a function-node to be send to influxdb's telegraf via the mqtt protocol.

SolarLog will update the JSON string every 15 seconds. So it makes no sense to pull it more often. The function node that parses the JSON string will make sure that only updated values are processed.

#How To

##Prerequisites

Setup instructions are widely available in the WWW for these products and may vary according to the hardware (RaspberryPI, i386 etc) and linux distribution.

##Configuration

###NodeRed

####Import Flow Copy the flow.json in raw format to your clipboard. In NodeRed import the copied flow: import_flow

####Configure POST to SolarLog enter image description here

Modify the URL to point to your SolarLog IP. The path needs to be:

IP_Solarlog/getjp

####Parse SolarLog JSON Double-click the function node Parse SolarLog JSON enter image description here

  • Identifier is the key in the SolarLog JSON string that identifies a certain measurement. Do not edit this value.
  • Active can be either set to 0 or 1 and will control, whether this Identifier is processed and send via MQTT. Set to 1 for every value, you want to store in InfluxDB.
  • The topic is needed for the MQTT message broker. This reflects the "path" or "channel" under which the MQTT message broker will publish the data. This will be the channel that we use in Telegraf to collect the data
  • Measurement is needed to put the data into the InfluxDB and will be used along with the tags. Read the InfluxDB manual here and here for a better understanding of the measurement and tag concept.
  • Tags are to be used alongside the measurement and need to be specified in the format Tag_Key = Tag_Value. They can be used to sort, query and cluster data within InfluxDB. You can specify none to an arbitrary number of tags.

####Configure MQTT output enter image description here

Edit the Server to match the IP and port of the MQTT message broker (e.g. Mosquitto).

###Telegraf In the telegraf.conf file you have to specify an Input-Plugin for the MQTT protocol:

# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
  servers = ["localhost:1883"]
  ## MQTT QoS, must be 0, 1, or 2
  qos = 0

  ## Topics to subscribe to
  topics = [
    "/power/#",
  ]

  # if true, messages that can't be delivered while the subscriber is offline
  # will be delivered when it comes back (such as on service restart).
  # NOTE: if true, client_id MUST be set
  persistent_session = false
  # If empty, a random client ID will be generated.
  client_id = ""

  ## username and password to connect MQTT server.
  # username = "telegraf"
  # password = "metricsmetricsmetricsmetrics"

  ## Optional SSL Config
  # ssl_ca = "/etc/telegraf/ca.pem"
  # ssl_cert = "/etc/telegraf/cert.pem"
  # ssl_key = "/etc/telegraf/key.pem"
  ## Use SSL but skip chain & host verification
  # insecure_skip_verify = false

  ## Data format to consume.
  ## Each data format has it's own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"
  • Modify servers to match the IP and port of the MQTT message broker (e.g. Mosquitto)
  • Modify topics to the topic(s) you used in the function node of the flow. Note that the # will match anything after the first part of the path
[{
	"id": "9d45503b.55366",
	"type": "mqtt out",
	"z": "8d52d7be.014ac",
	"name": "Push to Mosquitto",
	"topic": "",
	"qos": "0",
	"retain": "false",
	"broker": "d4c3cd4f.6e14c",
	"x": 150,
	"y": 480,
	"wires": []
},
{
	"id": "5fdfd82b.6b6fb8",
	"type": "inject",
	"z": "8d52d7be.014ac",
	"name": "Kick http request every 15 sec",
	"topic": "",
	"payload": "",
	"payloadType": "date",
	"repeat": "15",
	"crontab": "",
	"once": false,
	"x": 190,
	"y": 80,
	"wires": [["e17bfb1d.a670b"]]
},
{
	"id": "e17bfb1d.a670b",
	"type": "function",
	"z": "8d52d7be.014ac",
	"name": "Create http request package",
	"func": "msg.headers = {\n    'Content-Length' : '20',\n    'Connection' : 'keep-alive',\n    'Pragma' : 'no-cache', \n    'Cache-Control' : 'no-cache'\n};\nmsg.payload = {\"801\":{\"170\":null}};\nreturn msg;\n",
	"outputs": 1,
	"noerr": 0,
	"x": 180,
	"y": 160,
	"wires": [["ae639c3f.f92c28"]]
},
{
	"id": "ae639c3f.f92c28",
	"type": "http request",
	"z": "8d52d7be.014ac",
	"name": "POST to SolarLog",
	"method": "POST",
	"ret": "txt",
	"url": "192.168.178.15/getjp",
	"x": 150,
	"y": 240,
	"wires": [["7a20040a.089cf4",
	"ede15f1c.6f6e08"]]
},
{
	"id": "7a20040a.089cf4",
	"type": "debug",
	"z": "8d52d7be.014ac",
	"name": "",
	"active": false,
	"console": "false",
	"complete": "false",
	"x": 370,
	"y": 240,
	"wires": []
},
{
	"id": "ede15f1c.6f6e08",
	"type": "function",
	"z": "8d52d7be.014ac",
	"name": "Parse SolarLog JSON",
	"func": "var SolarLogMap = {\n    'lastUpdateTime':{\n\t\tidentifier: '100',\n\t\tactive: 0,\n        topic:'',\n\t\tmeasurement:'',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'Pac':{\n\t\tidentifier: '101',\n\t\tactive: 1,\n        topic:'/power/pv/output',\n\t\tmeasurement:'raw_PVoutput',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'Pdc':{\n\t\tidentifier: '102',\n\t\tactive: 0,\n        topic:'',\n\t\tmeasurement:'',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'Uac':{\n\t\tidentifier: '103',\n\t\tactive: 0,\n        topic:'',\n\t\tmeasurement:'',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'Udc':{\n\t\tidentifier: '104',\n\t\tactive: 0,\n        topic:'',\n\t\tmeasurement:'',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'yieldDay':{\n\t\tidentifier: '105',\n\t\tactive: 1,\n        topic:'/power/pv/yieldDay',\n\t\tmeasurement:'raw_PVyieldDay',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'yieldYesterday':{\n\t\tidentifier: '106',\n\t\tactive: 0,\n        topic:'',\n\t\tmeasurement:'',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'yieldMonth':{\n\t\tidentifier: '107',\n\t\tactive: 0,\n        topic:'',\n\t\tmeasurement:'',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'yieldYear':{\n\t\tidentifier: '108',\n\t\tactive: 0,\n        topic:'',\n\t\tmeasurement:'',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'yieldTotal':{\n\t\tidentifier: '109',\n\t\tactive: 0,\n        topic:'',\n\t\tmeasurement:'',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'consPac':{ \n\t\tidentifier: '110',\n\t\tactive: 1,\n        topic:'/power/pv/consumption',\n\t\tmeasurement:'raw_PVconsumption',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:'',\n\t\t\ttag3:''\n\t\t}\n    },\n    'consYieldDay':{\n\t\tidentifier: '111',\n\t\tactive: 1,\n        topic:'/power/pv/consYieldDay',\n\t\tmeasurement:'raw_PVconsYieldDay',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'consYieldYesterday':{\n\t\tidentifier: '112',\n\t\tactive: 0,\n        topic:'',\n\t\tmeasurement:'',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'consYieldMonth':{\n\t\tidentifier: '113',\n\t\tactive: 0,\n        topic:'',\n\t\tmeasurement:'',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'consYieldYear':{\n\t\tidentifier: '114',\n\t\tactive: 0,\n        topic:'',\n\t\tmeasurement:'',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'consYieldTotal':{\n\t\tidentifier: '115',\n\t\tactive: 0,\n        topic:'',\n\t\tmeasurement:'',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'totalPower':{\n\t\tidentifier: '116',\n\t\tactive: 0,\n        topic:'',\n\t\tmeasurement:'',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    }\n}; \n\n// define padding function to be used in timeString2ms\nfunction pad(number, length)\n{\n    var str = \"\" + number;\n    while (str.length < length) \n\t{\n        str = '0'+str;\n    }\n    return str;\n}\n\nfunction timeString2ms(SLts)\n{\n\t// Split the badly formated SolarLog timestamp\n\tSLts = SLts.split(' ');\n\tvar SLts_date = SLts[0].split('.');\n\t/*\n\t * get local timezone offset since the SolarLog time stamp\n\t * does not contain this information and we want to continue in \n\t * miliseconds which is no longer aware of the time zone\n\t * Note: This assumes that the SolarLog and Node Red instance\n\t * are running in the same time zone\n\t*/\n\tvar offset = new Date().getTimezoneOffset();\n\toffset = ((offset<0? '+':'-') + pad(parseInt(Math.abs(offset/60)), 2) + \":\" + pad(Math.abs(offset%60), 2));\n\t\n\t// Reorder and build new ISOdate time string\n\tvar newDate = '20' + SLts_date[2] + '-' + SLts_date[1] + '-' + SLts_date[0];\n\tvar timeString_new = newDate + 'T' + SLts[1] + offset;\n\t\n\t// Return time stamp as ms\n\treturn Date.parse(timeString_new);\n}\n\n// http://stackoverflow.com/a/11315561/2001479\n/* repeatString() returns a string which has been repeated a set number of times */ \nfunction repeatString(str, num) {\n    out = '';\n    for (var i = 0; i < num; i++) {\n        out += str; \n    }\n    return out;\n}\n\n/*\ndump() displays the contents of a variable like var_dump() does in PHP. dump() is\nbetter than typeof, because it can distinguish between array, null and object.  \nParameters:\n  v:              The variable\n  howDisplay:     \"none\", \"body\", \"alert\" (default)\n  recursionLevel: Number of times the function has recursed when entering nested\n                  objects or arrays. Each level of recursion adds extra space to the \n                  output to indicate level. Set to 0 by default.\nReturn Value:\n  A string of the variable's contents \nLimitations:\n  Can't pass an undefined variable to dump(). \n  dump() can't distinguish between int and float.\n  dump() can't tell the original variable type of a member variable of an object.\n  These limitations can't be fixed because these are *features* of JS. However, dump()\n*/\nfunction dump(v, howDisplay, recursionLevel) {\n    howDisplay = (typeof howDisplay === 'undefined') ? \"alert\" : howDisplay;\n    recursionLevel = (typeof recursionLevel !== 'number') ? 0 : recursionLevel;\n\n\n    var vType = typeof v;\n    var out = vType;\n\n    switch (vType) {\n        case \"number\":\n            /* there is absolutely no way in JS to distinguish 2 from 2.0\n            so 'number' is the best that you can do. The following doesn't work:\n            var er = /^[0-9]+$/;\n            if (!isNaN(v) && v % 1 === 0 && er.test(3.0))\n                out = 'int';*/\n        case \"boolean\":\n            out += \": \" + v;\n            break;\n        case \"string\":\n            out += \"(\" + v.length + '): \"' + v + '\"';\n            break;\n        case \"object\":\n            //check if null\n            if (v === null) {\n                out = \"null\";\n\n            }\n            //If using jQuery: if ($.isArray(v))\n            //If using IE: if (isArray(v))\n            //this should work for all browsers according to the ECMAScript standard:\n            else if (Object.prototype.toString.call(v) === '[object Array]') {  \n                out = 'array(' + v.length + '): {\\n';\n                for (var i = 0; i < v.length; i++) {\n                    out += repeatString('   ', recursionLevel) + \"   [\" + i + \"]:  \" + \n                        dump(v[i], \"none\", recursionLevel + 1) + \"\\n\";\n                }\n                out += repeatString('   ', recursionLevel) + \"}\";\n            }\n            else { //if object    \n                sContents = \"{\\n\";\n                cnt = 0;\n                for (var member in v) {\n                    //No way to know the original data type of member, since JS\n                    //always converts it to a string and no other way to parse objects.\n                    sContents += repeatString('   ', recursionLevel) + \"   \" + member +\n                        \":  \" + dump(v[member], \"none\", recursionLevel + 1) + \"\\n\";\n                    cnt++;\n                }\n                sContents += repeatString('   ', recursionLevel) + \"}\";\n                out += \"(\" + cnt + \"): \" + sContents;\n            }\n            break;\n    }\n\n    if (howDisplay == 'body') {\n        var pre = document.createElement('pre');\n        pre.innerHTML = out;\n        document.body.appendChild(pre);\n    }\n    else if (howDisplay == 'alert') {\n        alert(out);\n    }\n\n    return out;\n}\n// Parse SolarLog's JSON string\nvar mySLdata = [];\n\n// Sometimes parsing errors pop up\n// Try to figure out why\ntry\n{\n    var mySLdata = JSON.parse(msg.payload);\n}\ncatch(err) \n{\n    node.warn(\"Parse Error: \" + err);\n    node.warn(\"Error Msg Obj: \" + dump(msg, 'none', 3));\n    return null;\n}\n\n// Read previous Time Stamp from context\nvar myTimestampOld = context.get('myTimestampOld')||0;\n\n// Get last update time from SolarLog\n// and convert to ms\nvar myTimestampSL = mySLdata['801']['170']['100'];\nvar myTimestamp = timeString2ms(myTimestampSL);\n\n// Compare current Time Stamp to the context time stamp\n// Continue only if current Time Stamp is newer\nif (myTimestampOld == myTimestamp)\n{\n\treturn null;\n}\ncontext.set('myTimestampOld',myTimestamp);\n\n\n// Iterate through SolarLogMap and return \n// corresponding values from SolarLog's data\nvar myResults = [];\nfor(var i in SolarLogMap)\n{\n\tif (SolarLogMap[i]['active'])\n\t{\n\t\tvar myTags =[];\n\t\tfor(var ii in SolarLogMap[i]['tags'])\n\t\t{\n\t\t\tif (SolarLogMap[i]['tags'][ii].length > 0) \n\t\t\t{\n\t\t\t\tmyTags.push(SolarLogMap[i]['tags'][ii]);\n\t\t\t}\n\t\t}\n\t\tvar myTagLine = myTags.join(\",\");\n\t\n\t\tvar myTopic = SolarLogMap[i]['topic'];\n\t\tvar myLine1 = SolarLogMap[i]['measurement'] + (myTagLine.length===0? '':','+ myTagLine);\n\t\tvar myIdentifier = SolarLogMap[i]['identifier'];\n\t\tvar myLine2 = \"value=\" + mySLdata['801']['170'][myIdentifier] + \" \" + (myTimestamp*1000000);\n\t\tvar myPayload = myLine1 + \" \" + myLine2;\n\t\t\n\t\tmyResults.push({topic:myTopic, payload:myPayload});\n\t}\n}\n\nreturn [myResults];",
	"outputs": 1,
	"noerr": 0,
	"x": 160,
	"y": 320,
	"wires": [["7b776343.0846cc",
	"13a21e96.1296b9"]]
},
{
	"id": "7b776343.0846cc",
	"type": "debug",
	"z": "8d52d7be.014ac",
	"name": "",
	"active": false,
	"console": "false",
	"complete": "payload",
	"x": 370,
	"y": 320,
	"wires": []
},
{
	"id": "13a21e96.1296b9",
	"type": "delay",
	"z": "8d52d7be.014ac",
	"name": "Only send one payload each second",
	"pauseType": "rate",
	"timeout": "5",
	"timeoutUnits": "seconds",
	"rate": "1",
	"rateUnits": "second",
	"randomFirst": "1",
	"randomLast": "5",
	"randomUnits": "seconds",
	"drop": false,
	"x": 210,
	"y": 400,
	"wires": [["9d45503b.55366"]]
},
{
	"id": "d4c3cd4f.6e14c",
	"type": "mqtt-broker",
	"z": "8d52d7be.014ac",
	"broker": "192.168.178.8",
	"port": "1883",
	"clientid": "node_red",
	"usetls": false,
	"verifyservercert": true,
	"compatmode": false,
	"keepalive": "60",
	"cleansession": true,
	"willTopic": "",
	"willQos": "0",
	"willRetain": null,
	"willPayload": "",
	"birthTopic": "",
	"birthQos": "0",
	"birthRetain": null,
	"birthPayload": ""
}]
Sineos

Flow Info

created 10 months, 3 weeks ago

Node Types

Core
  • debug (x2)
  • delay (x1)
  • function (x2)
  • http request (x1)
  • inject (x1)
  • mqtt out (x1)
  • mqtt-broker (x1)

Tags

  • SolarLog
  • InfluxDB
  • Telegraf
  • MQTT
Copy this flow JSON to your clipboard and then import into Node-RED using the Import From > Clipboard (Ctrl-I) menu option