The approach I took was to simply concatenate a timestamp to the reading, separated by a , for all messages transported by MQTT. This will obviously impact any other emoncms destinations that you’re emonpi provides data to over MQTT.
Here are the diff outputs that identify the changes I made. I think i’ve captured everything related to this but let me know if you run into any issues and i’ll dig a little deeper to find anything I may have missed:
First off, are the changes to EmonHub that receive the readings and place on MQTT queue:
pi@emonpi:/var/www/emoncms $ cd /usr/share/emonhub
pi@emonpi:/usr/share/emonhub $ git diff
diff --git a/src/interfacers/EmonHubMqttInterfacer.py b/src/interfacers/EmonHubMqttInterfacer.py
index d25958c..35ed085 100644
--- a/src/interfacers/EmonHubMqttInterfacer.py
+++ b/src/interfacers/EmonHubMqttInterfacer.py
@@ -120,7 +120,9 @@ class EmonHubMqttInterfacer(EmonHubInterfacer):
varstr = str(cargo.names[varid-1])
# Construct topic
topic = self._settings["nodevar_format_basetopic"]+nodestr+"/"+varstr
- payload = str(value)
+ # MTC:modified to prefix value with current time - phpmqtt_input.php modified accordingly
+ now = int(time.time())
+ payload = str(now)+":"+str(value)
self._log.info("Publishing: "+topic+" "+payload)
result =self._mqttc.publish(topic, payload=payload, qos=2, retain=False)
Next are the corresponding changes to phpmqtt_input.php that takes messages of the MQTT queue and persists to emoncms. Note the setting of a client identifier for the Mosqiuitto client connection that is required in order for messages to be reliably delivered and should be different for the mosquitto client in phpmqtt_input on each emoncms destination. I also reduced the timeout when i was trying to troubleshoot issues, but not sure if this is strictly necessary. The same is probably true for un-assigning values/time variables. Think I was suspected a memory leak somewhere, but left as is since it’s stable but could probably be improved/tidied up:
pi@emonpi:/var/www/emoncms/scripts $ git diff
diff --git a/scripts/phpmqtt_input.php b/scripts/phpmqtt_input.php
index b5ec231..a953c3c 100644
--- a/scripts/phpmqtt_input.php
+++ b/scripts/phpmqtt_input.php
@@ -1,5 +1,5 @@
<?php
-
+//gc_enable();
// TBD: support user target in message schema
$mqttsettings = array(
'userid' => 1
@@ -86,7 +86,7 @@
require_once "Modules/process/process_model.php";
$process = new Process($mysqli,$input,$feed,$user->get_timezone($mqttsettings['userid']));
- $mqtt_client = new Mosquitto\Client();
+ $mqtt_client = new Mosquitto\Client('emonpi_phpmqtt', false);
$connected = false;
$last_retry = 0;
@@ -108,7 +108,7 @@
$last_retry = time();
try {
$mqtt_client->setCredentials($mqtt_server['user'],$mqtt_server['password']);
- $mqtt_client->connect($mqtt_server['host'], $mqtt_server['port'], 5);
+ $mqtt_client->connect($mqtt_server['host'], $mqtt_server['port'], 60);
$topic = $mqtt_server['basetopic']."/#";
echo "Subscribing to: ".$topic."\n";
$log->warn("Subscribing to: ".$topic);
@@ -177,7 +177,20 @@
if (isset($route[2]))
{
+ // MTC:modified /home/pi/emonhub/src/interfacers/EmonHubMqttInterfacer.py to pass value prefixed with time - time:value
+ $values = explode(":",$value);
+ $time = $values[0];
+ $value = $values[1];
+
$inputs[] = array("userid"=>$userid, "time"=>$time, "nodeid"=>$nodeid, "name"=>$route[2], "value"=>$value);
+
+ $values = null;
+ $time = null;
+ $value = null;
+
+ unset($values);
+ unset($time);
+ unset($value);
}
else
{
In terms of transporting data to remote emoncms instances, i chose to leave that to MQTT by installing MQTT on the destination instance, configuring emoncms to use the local instance and configuring MQTT to connect to the emonpi and subscribe to messages:
connection emonpi
address 192.168.0.23:1883
cleansession false
topic # out 2 emon/ emon/
I’m pretty sure i made changes to MQTT on the emonPi, but they aren’t versioned so here’s a dump of my current config so you can compare to what you have. You’ll almost certainly need the persistence configuration elements e.g. max_queued_messages, persistent_client_expiration, upgrade_outgoing_qos, persistence, persistence_location, etc.:
pi@emonpi:/etc/mosquitto $ cat mosquitto.conf
# Place your local configuration in /etc/mosquitto/conf.d/
#
# A full description of the configuration file is at
# /usr/share/doc/mosquitto/examples/mosquitto.conf.example
pid_file /var/run/mosquitto.pid
autosave_interval 360
max_queued_messages 1000000
persistent_client_expiration 31d
max_inflight_messages 10
allow_duplicate_messages false
upgrade_outgoing_qos true
persistence true
persistence_location /home/pi/data/mosquitto/
max_connections 20
connection_messages false
log_dest file /var/log/mosquitto/mosquitto.log
include_dir /etc/mosquitto/conf.d
allow_anonymous true
password_file /etc/mosquitto/passwd
# Bridge to raspi
connection raspi
start_type automatic
address 192.168.0.22:1883
cleansession false
topic # in 2 emon/currentcost/ emon/currentcost/
keepalive_interval 60
Note that I also have a bridge configured here to another MQTT instance on a raspberry pi that I interface with a currentcost monitor using python and dump messages to MQTT so that data finds it’s way into emoncms too. Here are MQTT relevant snippets from that script if of interest:
...
use Net::MQTT::Simple;
...
my $now;
my $mqtt1 = Net::MQTT::Simple->new("127.0.0.1");
...
# log to mqtt
$now = time();
$now -= 3;
$mqtt1->publish("emon/currentcost/watts", "$now:$watts");
$mqtt1->publish("emon/currentcost/temp", "$now:$temp");
...
This solution has proved to be pretty reliable for me, and can withstand network outages at either source or destination, or destination nodes being shutdown. Data is buffered by MQTT and delivered when destinations become available.
Let me know if anythings unclear, and i’ll try and clarify further