Push Volkszaehler Readings to Influxdb via MQTT
This flow connects to the Volkszaehler push-server via a websocket-node and receives json formatted measurements. These measurements are then transformed in a function-node to be send to influxdb's telegraf via the mqtt protocol.
#How To ##Prerequisites
- NodeRed
- Vzlogger
- Volkszaehler.org
- Mosquitto or any other MQTT message broker
- Telegraf
- InfluxDB
- Grafana Optional for visualization
Setup instructions are widely available in the WWW for these products and may vary according to the hardware (Raspberry PI, i386 etc) and linux distribution.
##Configuration
###Vzlogger Make sure you Vzlogger installation past the 28 Jun 2015 to support the push settings.
vzlogger.conf
// realtime notification settings
"push": [
{
"url": "http://127.0.0.1:5582" // notification destination, e.g. frontend push-server
}
],
If your volkszaehler.org instance is running on the same machine as vzlogger you can leave the url
setting, otherwise replace it with the IP of your volkszaehler.org instance.
###volkszaehler.org To use the volkszaehler push feature you will need a volkszaehler version after this commit. I recommend the manual installation of volkszaehler, described here in the chapter "Manuelle Installation". This makes updating the installation a lot easier later.
volkszaehler.conf.php
/**
* Push server settings
*/
$config['push']['enabled'] = true; // set to true to enable push updates
$config['push']['server'] = 5582; // vzlogger will push to this ports (binds on 0.0.0.0)
$config['push']['broadcast'] = 8082; // frontend will subscribe on this port (binds on 0.0.0.0)
$config['push']['routes']['wamp'] = array('/', '/ws'); // routes for wamp access
$config['push']['routes']['websocket'] = array('/socket'); // routes for plain web sockets, try array('/socket')
Modify $config['push']['enabled']
and $config['push']['routes']['websocket']
as shown above.
Change into the volkszaehler.org path and start the push server with:
php misc/tools/push-server.php
Alternatively you can setup a service to have it started at boot:
sudo nano /etc/systemd/system/push-server.service
paste following template to the file:
[Unit]
Description=push-server
After=syslog.target network.target
Requires=
[Service]
ExecStart=/usr/bin/php /var/www/volkszaehler.org/misc/tools/push-server.php
ExecReload=/bin/kill -HUP $MAINPID
StandardOutput=null
Restart=always
[Install]
WantedBy=multi-user.target
Be sure to have right path to the push-server.php in ExecStart.
To start the push-server use:
sudo systemctl start push-server
To enable the push-server at boot:
sudo systemctl enable push-server
###NodeRed ####Import Flow Copy the flow.json in raw format to your clipboard. In NodeRed import the copied flow:
####Configure push-server input
Add / modify the URL to point to your push-server IP. If NodeRed and the volkszaehler / push-server is running on the same machine use:
ws://127.0.0.1:8082/socket
####Configure function node
Double-click the function node Format payload for influxdb
- Replace the
XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
with the actual UUIDs from your volkszaehler.org / vzlogger installation - The
topic
is needed for the MQTT message broker. This reflects the "path" or "channel" under which the MQTT message broker will publish the data. This will be the channel that we use in Telegraf to collect the data Measurement
is needed to put the data into the InfluxDB and will be used along with thetags
. Read the InfluxDB manual here and here for a better understanding of the measurement and tag concept.Tags
are to be used alongside themeasurement
and need to be specified in the formatTag_Key = Tag_Value
. They can be used to sort, query and cluster data within InfluxDB. You can specify none to an arbitrary number of tags.
Within uuidMap you can specify all channels from volkszaehler. It is important to keep the following structure intact:
'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{
topic:'/power/sml/Bezug',
measurement:'power_sml',
tags:{
tag1:'Type=sml',
tag2:'Location=Bezug'
tag3:'bla'
}
},
####Configure MQTT output
Edit the Server
to match the IP and port of the MQTT message broker (e.g. Mosquitto).
###Telegraf In the telegraf.conf file you have to specify an Input-Plugin for the MQTT protocol:
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
servers = ["localhost:1883"]
## MQTT QoS, must be 0, 1, or 2
qos = 0
## Topics to subscribe to
topics = [
"/power/#",
]
# if true, messages that can't be delivered while the subscriber is offline
# will be delivered when it comes back (such as on service restart).
# NOTE: if true, client_id MUST be set
persistent_session = false
# If empty, a random client ID will be generated.
client_id = ""
## username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
- Modify
servers
to match the IP and port of the MQTT message broker (e.g. Mosquitto) - Modify
topics
to the topic(s) you used in the function node of the flow. Note that the#
will match anything after the first part of the path
The Flow
[{"id":"8b5844a8.98ceb","type":"websocket-client","z":"f6a99b89.ee40b8","path":"ws://192.168.178.8:8082/socket","wholemsg":"false"},{"id":"da27812f.a869e","type":"mqtt-broker","z":"f6a99b89.ee40b8","broker":"192.168.178.8","port":"1883","clientid":"node_red","usetls":false,"verifyservercert":true,"compatmode":false,"keepalive":"60","cleansession":true,"willTopic":"","willQos":"0","willRetain":null,"willPayload":"","birthTopic":"","birthQos":"0","birthRetain":null,"birthPayload":""},{"id":"2a0bf3da.86486c","type":"debug","z":"f6a99b89.ee40b8","name":"","active":true,"console":"false","complete":"payload","x":470,"y":180,"wires":[]},{"id":"c96f99e5.7e0f68","type":"mqtt out","z":"f6a99b89.ee40b8","name":"Push to Mosquitto","topic":"","qos":"0","retain":"false","broker":"da27812f.a869e","x":190,"y":260,"wires":[]},{"id":"46a539c8.f9ef08","type":"debug","z":"f6a99b89.ee40b8","name":"","active":false,"console":"false","complete":"payload","x":390,"y":100,"wires":[]},{"id":"92d0eee7.2a6508","type":"function","z":"f6a99b89.ee40b8","name":"Format payload for influxdb","func":"/* Parses Json and formats data to be send to influxdb via \n* telegraf mqtt plugin\n* topic = topic used for mqtt\n* measurement = measurement in influxdb\n* tagX = tag information in influxdb\n*/\n\nvar uuidMap = {\n 'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{\n topic:'/power/sml/Leistung',\n\t\tmeasurement:'power_sml',\n\t\ttags:{\n\t\t\ttag1:'Type=sml',\n\t\t\ttag2:'Location=Leistung'\n\t\t}\n },\n 'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{\n topic:'/power/sml/Bezug',\n\t\tmeasurement:'power_sml',\n\t\ttags:{\n\t\t\ttag1:'Type=sml',\n\t\t\ttag2:'Location=Bezug'\n\t\t\ttag3:'bla'\n\t\t}\n },\n 'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{\n topic:'/power/sml/Lieferung',\n\t\tmeasurement:'power_sml',\n\t\ttags:{\n\t\t}\n },\n 'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{\n topic:'/power/s0/pv',\n\t\tmeasurement:'power_s0',\n\t\ttags:{\n\t\t\ttag1:'',\n\t\t\ttag2:''\n\t\t}\n },\n 'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{\n topic:'/power/s0/og',\n\t\tmeasurement:'power_s0',\n\t\ttags:{\n\t\t\ttag1:'Type=s0',\n\t\t\ttag2:'Location=og'\n\t\t}\n },\n 'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{\n topic:'/power/s0/eg',\n\t\tmeasurement:'power_s0',\n\t\ttags:{\n\t\t\ttag1:'Type=s0',\n\t\t\ttag2:'Location=eg'\n\t\t}\n },\n 'XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX':{\n topic:'/power/s0/ug',\n\t\tmeasurement:'power_s0',\n\t\ttags:{\n\t\t\ttag1:'Type=s0',\n\t\t\ttag2:'Location=ug'\n\t\t}\n }\n}; \n\n// Parse JSON \nvar myJsonObj = [];\nvar myJsonObj = JSON.parse(msg.payload);\n\n// Get value, timestam and UUID\nvar myUuid = myJsonObj.data.uuid;\nvar myTimestamp = myJsonObj.data.tuples[0][0];\nvar myValue = myJsonObj.data.tuples[0][1];\n\n// Dynamically read tags from uuidMap\nvar myTags =[];\nfor(var i in uuidMap[myUuid]['tags'])\n{\n\tif (uuidMap[myUuid]['tags'][i].length > 0) \n\t{\n\t\tmyTags.push(uuidMap[myUuid]['tags'][i]);\n\t}\n}\nvar myTagLine = myTags.join(\",\");\n\n// Create output payload\t\nvar myOutput = {};\nmyOutput.topic = uuidMap[myUuid]['topic'];\nmyLine1 = uuidMap[myUuid]['measurement'] + (myTagLine.length==0? '':','+ myTagLine);\nmyLine2 = \"value=\" + myValue + \" \" + (myTimestamp*1000000);\nmyOutput.payload = myLine1 + \" \" + myLine2;\nreturn myOutput;\n\t","outputs":1,"noerr":6,"x":220,"y":180,"wires":[["2a0bf3da.86486c","c96f99e5.7e0f68"]]},{"id":"69806b97.448f4c","type":"websocket in","z":"f6a99b89.ee40b8","name":"Connect to VZ push-server","server":"","client":"8b5844a8.98ceb","x":150,"y":100,"wires":[["46a539c8.f9ef08","92d0eee7.2a6508"]]}]