Push Volkszaehler Readings to Influxdb via MQTT

This flow connects to the Volkszaehler push-server via a websocket-node and receives json formatted measurements. These measurements are then transformed in a function-node to be send to influxdb's telegraf via the mqtt protocol.

#How To

##Prerequisites

Setup instructions are widely available in the WWW for these products and may vary according to the hardware (Raspberry PI, i386 etc) and linux distribution.

##Configuration

###Vzlogger Make sure you Vzlogger installation past the 28 Jun 2015 to support the push settings.

vzlogger.conf

     // realtime notification settings
    "push": [
        {
            "url": "http://127.0.0.1:5582"  // notification destination, e.g. frontend push-server
        }
],

If your volkszaehler.org instance is running on the same machine as vzlogger you can leave the url setting, otherwise replace it with the IP of your volkszaehler.org instance.

###volkszaehler.org To use the volkszaehler push feature you will need a volkszaehler version after this commit. I recommend the manual installation of volkszaehler, described here in the chapter "Manuelle Installation". This makes updating the installation a lot easier later.

volkszaehler.conf.php

/**
 * Push server settings
 */
$config['push']['enabled'] = true;        // set to true to enable push updates
$config['push']['server'] = 5582;        // vzlogger will push to this ports (binds on 0.0.0.0)
$config['push']['broadcast'] = 8082;    // frontend will subscribe on this port (binds on 0.0.0.0)
$config['push']['routes']['wamp'] = array('/', '/ws');        // routes for wamp access
$config['push']['routes']['websocket'] = array('/socket');            // routes for plain web sockets, try array('/socket')

Modify $config['push']['enabled'] and $config['push']['routes']['websocket'] as shown above.

Change into the volkszaehler.org path and start the push server with:

 php misc/tools/push-server.php

Alternatively you can setup a service to have it started at boot:

sudo nano /etc/systemd/system/push-server.service

paste following template to the file:

[Unit]
Description=push-server
After=syslog.target network.target
Requires=

[Service]
ExecStart=/usr/bin/php /var/www/volkszaehler.org/misc/tools/push-server.php
ExecReload=/bin/kill -HUP $MAINPID
StandardOutput=null
Restart=always

[Install]
WantedBy=multi-user.target

Be sure to have right path to the push-server.php in ExecStart.

To start the push-server use:

 sudo systemctl start push-server

To enable the push-server at boot:

sudo systemctl enable push-server

###NodeRed

####Import Flow Copy the flow.json in raw format to your clipboard. In NodeRed import the copied flow: import_flow

####Configure push-server input enter image description here

Add / modify the URL to point to your push-server IP. If NodeRed and the volkszaehler / push-server is running on the same machine use:

ws://127.0.0.1:8082/socket

####Configure function node Double-click the function node Format payload for influxdb enter image description here

  • Replace the XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX with the actual UUIDs from your volkszaehler.org / vzlogger installation
  • The topic is needed for the MQTT message broker. This reflects the "path" or "channel" under which the MQTT message broker will publish the data. This will be the channel that we use in Telegraf to collect the data
  • Measurement is needed to put the data into the InfluxDB and will be used along with the tags. Read the InfluxDB manual here and here for a better understanding of the measurement and tag concept.
  • Tags are to be used alongside the measurement and need to be specified in the format Tag_Key = Tag_Value. They can be used to sort, query and cluster data within InfluxDB. You can specify none to an arbitrary number of tags.

Within uuidMap you can specify all channels from volkszaehler. It is important to keep the following structure intact:

'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{
    topic:'/power/sml/Bezug',
    measurement:'power_sml',
    tags:{
        tag1:'Type=sml',
        tag2:'Location=Bezug'
        tag3:'bla'
    }
}, 

####Configure MQTT output enter image description here

Edit the Server to match the IP and port of the MQTT message broker (e.g. Mosquitto).

###Telegraf In the telegraf.conf file you have to specify an Input-Plugin for the MQTT protocol:

# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
  servers = ["localhost:1883"]
  ## MQTT QoS, must be 0, 1, or 2
  qos = 0

  ## Topics to subscribe to
  topics = [
    "/power/#",
  ]

  # if true, messages that can't be delivered while the subscriber is offline
  # will be delivered when it comes back (such as on service restart).
  # NOTE: if true, client_id MUST be set
  persistent_session = false
  # If empty, a random client ID will be generated.
  client_id = ""

  ## username and password to connect MQTT server.
  # username = "telegraf"
  # password = "metricsmetricsmetricsmetrics"

  ## Optional SSL Config
  # ssl_ca = "/etc/telegraf/ca.pem"
  # ssl_cert = "/etc/telegraf/cert.pem"
  # ssl_key = "/etc/telegraf/key.pem"
  ## Use SSL but skip chain & host verification
  # insecure_skip_verify = false

  ## Data format to consume.
  ## Each data format has it's own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"
  • Modify servers to match the IP and port of the MQTT message broker (e.g. Mosquitto)
  • Modify topics to the topic(s) you used in the function node of the flow. Note that the # will match anything after the first part of the path

The Flow

[{
	"id": "8b5844a8.98ceb",
	"type": "websocket-client",
	"z": "f6a99b89.ee40b8",
	"path": "ws://192.168.178.8:8082/socket",
	"wholemsg": "false"
},
{
	"id": "da27812f.a869e",
	"type": "mqtt-broker",
	"z": "f6a99b89.ee40b8",
	"broker": "192.168.178.8",
	"port": "1883",
	"clientid": "node_red",
	"usetls": false,
	"verifyservercert": true,
	"compatmode": false,
	"keepalive": "60",
	"cleansession": true,
	"willTopic": "",
	"willQos": "0",
	"willRetain": null,
	"willPayload": "",
	"birthTopic": "",
	"birthQos": "0",
	"birthRetain": null,
	"birthPayload": ""
},
{
	"id": "2a0bf3da.86486c",
	"type": "debug",
	"z": "f6a99b89.ee40b8",
	"name": "",
	"active": true,
	"console": "false",
	"complete": "payload",
	"x": 470,
	"y": 180,
	"wires": []
},
{
	"id": "c96f99e5.7e0f68",
	"type": "mqtt out",
	"z": "f6a99b89.ee40b8",
	"name": "Push to Mosquitto",
	"topic": "",
	"qos": "0",
	"retain": "false",
	"broker": "da27812f.a869e",
	"x": 190,
	"y": 260,
	"wires": []
},
{
	"id": "46a539c8.f9ef08",
	"type": "debug",
	"z": "f6a99b89.ee40b8",
	"name": "",
	"active": false,
	"console": "false",
	"complete": "payload",
	"x": 390,
	"y": 100,
	"wires": []
},
{
	"id": "92d0eee7.2a6508",
	"type": "function",
	"z": "f6a99b89.ee40b8",
	"name": "Format payload for influxdb",
	"func": "/* Parses Json and formats data to be send to influxdb via \n*  telegraf mqtt plugin\n*  topic = topic used for mqtt\n*  measurement = measurement in influxdb\n*  tagX = tag information in influxdb\n*/\n\nvar uuidMap = {\n    'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{\n        topic:'/power/sml/Leistung',\n\t\tmeasurement:'power_sml',\n\t\ttags:{\n\t\t\ttag1:'Type=sml',\n\t\t\ttag2:'Location=Leistung'\n\t\t}\n    },\n    'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{\n        topic:'/power/sml/Bezug',\n\t\tmeasurement:'power_sml',\n\t\ttags:{\n\t\t\ttag1:'Type=sml',\n\t\t\ttag2:'Location=Bezug'\n\t\t\ttag3:'bla'\n\t\t}\n    },\n    'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{\n        topic:'/power/sml/Lieferung',\n\t\tmeasurement:'power_sml',\n\t\ttags:{\n\t\t}\n    },\n    'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{\n        topic:'/power/s0/pv',\n\t\tmeasurement:'power_s0',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n    },\n    'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{\n        topic:'/power/s0/og',\n\t\tmeasurement:'power_s0',\n\t\ttags:{\n\t\t\ttag1:'Type=s0',\n\t\t\ttag2:'Location=og'\n\t\t}\n    },\n    'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{\n        topic:'/power/s0/eg',\n\t\tmeasurement:'power_s0',\n\t\ttags:{\n\t\t\ttag1:'Type=s0',\n\t\t\ttag2:'Location=eg'\n\t\t}\n    },\n    'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{\n        topic:'/power/s0/ug',\n\t\tmeasurement:'power_s0',\n\t\ttags:{\n\t\t\ttag1:'Type=s0',\n\t\t\ttag2:'Location=ug'\n\t\t}\n    }\n}; \n\n// Parse JSON \nvar myJsonObj = [];\nvar myJsonObj = JSON.parse(msg.payload);\n\n// Get value, timestam and UUID\nvar myUuid = myJsonObj.data.uuid;\nvar myTimestamp = myJsonObj.data.tuples[0][0];\nvar myValue = myJsonObj.data.tuples[0][1];\n\n// Dynamically read tags from uuidMap\nvar myTags =[];\nfor(var i in uuidMap[myUuid]['tags'])\n{\n\tif (uuidMap[myUuid]['tags'][i].length > 0) \n\t{\n\t\tmyTags.push(uuidMap[myUuid]['tags'][i]);\n\t}\n}\nvar myTagLine = myTags.join(\",\");\n\n// Create output payload\t\nvar myOutput = {};\nmyOutput.topic = uuidMap[myUuid]['topic'];\nmyLine1 = uuidMap[myUuid]['measurement'] + (myTagLine.length==0? '':','+ myTagLine);\nmyLine2 = \"value=\" + myValue + \" \" + (myTimestamp*1000000);\nmyOutput.payload = myLine1 + \" \" + myLine2;\nreturn myOutput;\n\t",
	"outputs": 1,
	"noerr": 6,
	"x": 220,
	"y": 180,
	"wires": [["2a0bf3da.86486c",
	"c96f99e5.7e0f68"]]
},
{
	"id": "69806b97.448f4c",
	"type": "websocket in",
	"z": "f6a99b89.ee40b8",
	"name": "Connect to VZ push-server",
	"server": "",
	"client": "8b5844a8.98ceb",
	"x": 150,
	"y": 100,
	"wires": [["46a539c8.f9ef08",
	"92d0eee7.2a6508"]]
}]
Sineos

Flow Info

created 10 months, 3 weeks ago

Node Types

Core
  • debug (x2)
  • function (x1)
  • mqtt out (x1)
  • mqtt-broker (x1)
  • websocket in (x1)
  • websocket-client (x1)

Tags

  • Volkszaehler
  • Volkszähler
  • InfluxDB
  • Telegraf
  • MQTT
  • Vzlogger
Copy this flow JSON to your clipboard and then import into Node-RED using the Import From > Clipboard (Ctrl-I) menu option