@xavidpr4 I already discussed about this point (and my data is coming from Tasmota for me, funny @borpin we use same things) in this topic
I solved all of this, kept security feature using mqtt authentification and all is working fine.
First of all here (in Tasmota but could be anything else) look at the MQTT config below

I select user/password, the user name should match the emoncms username (not user ID) and the password should match the user read api key. In this example user is hallard
node name (emoncms view) is maison
Then to be able to publish to user area the topic should match (because I use tasmota and it’s Tasmota format) but can be changed later below in php script
topicbase/username/nodename/device_name/tele/SENSOR
Here MQTT explorer view and emoncms inputs for better understanding
Ok basic are show, let’s go deeper. For this to works you need to set authentication with Mosquitto and using password file (out of this scope). The idea is to get emoncms user name/password from database get in sync with MQTT user password file.
Emoncms and MQTT are running into docker (each one has it own instance) that may complexify scripts (because they need to be running into container) but here the deal. Note that mosquitto config is binded onty local disk file so I can access easily from the host
here sync_password.sh
that generate a flat text file containing a text file with username/read_api_key connecting emoncms container and execute a mysql query.
#!/bin/sh
# Extract Emoncms Users
docker exec -it emoncms_db_1 mysql -umysql_user_name -pmysql_password -Demoncms -NBr -e "SELECT username, apikey_read FROM users" | sed 's/\t/:/g' >~/docker/mosquitto/data/config/passwd.emon
I have only few user for now so I’m creating them manually once extracted but it’s easy to make another script for that (in that case may be better to do this one in mosquitto container instance)
So, look at the text file ~/docker/mosquitto/data/config/passwd.emon
grab user name and api key then fire a command to update mosquitto password file (data/config/passwd
in my case)
mosquitto_passwd -b ~/docker/mosquitto/data/config/passwd username 1f29ff88397f32f374771ccc57037a2d
Then you need to send signal to mosquitto instance to reload config (new password) without killing existing sessions (reload_config
script)
#!/bin/sh
# Reload config in docker container named mosquitto
docker exec -it mosquitto pkill -HUP mosquitto
Ok we’re almost done, now I created a new php called emoncms_mqtt_api.php
mainly copied from original emoncms_mqtt.php
What I changed of course is the topic to listen according what’s defined above by tasmota (in my case basetopic is emoncms
)
$topic = $settings['mqtt']['basetopic']."/+/+/+/tele/SENSOR";
Then parsing the message followed these changes
$inputs = array();
// 1. Filter out basetopic
$topic = str_replace($settings['mqtt']['basetopic']."/","",$topic);
// 2. Split by /
$route = explode("/",$topic);
$route_len = count($route);
// Topic is basetopic/username/nodename/devicename/tele/
//print("Is Json:".$jsoninput." Time:".$time." route:".$route_len."\n");
// Userid or API is first entry
$username = $route[0];
// Looks like API format
$userid = $user->get_id($username);
// Node id is second entry
$nodeid = $route[1];
// Filter nodeid, pre input create, to avoid duplicate inputs
$nodeid = preg_replace('/[^\p{N}\p{L}_\s\-.]/u','',$nodeid);
$dbinputs = $input->get_inputs($userid);
if ($jsoninput) {
$mykey = "ENERGY";
// If JSON, check to see if there is a time value else set to time now.
if (array_key_exists( $mykey, $jsondata) ){
foreach ($jsondata->$mykey as $key=>$value) {
$inputs[] = array("userid"=>$userid, "time"=>$time, "nodeid"=>$nodeid, "name"=>$key, "value"=>$value);
}
}
}
And voila, I know it’s quick and mainly missing documentation but it works. Ideally what would be awesome
- that emoncms could fire a command on user creation (to extract username/api key) and sync mosquitto it would be perfect
- that this modified
emoncms_mqtt_api.php
script be part of emoncms (can be renamed)
Here the full script emoncms_mqtt_api.php
<?php
/*
**MQTT input interface script**
SERVICE INSTALL INSTRUCTIONS:
https://github.com/emoncms/emoncms/blob/master/docs/RaspberryPi/MQTT.md
EXAMPLES:
create an input from emonTx node called power with value 10:
[basetopic]/emontx/power 10
create an input from node 10 called power with value 10 :
[basetopic]/10/power 10
create input from emontx with key 0 of value 10
[basetopic]/emontx 10
create input from emontx with key 0 of value 10, key 1 of value 11 and key 2 of value 11
[basetopic]/emontx 10,11,12
* [basetopic] and user ID of target Emoncms account can be set in settings.php
Emoncms then processes these inputs in the same way as they would be
if sent to the HTTP Api.
*/
// This code is released under the GNU Affero General Public License.
// OpenEnergyMonitor project:
// http://openenergymonitor.org
define('EMONCMS_EXEC', 1);
$fp = fopen("/var/lock/emoncms_mqtt.lock", "w");
if (! flock($fp, LOCK_EX | LOCK_NB)) { echo "Already running\n"; die; }
chdir(dirname(__FILE__));
require "Lib/EmonLogger.php";
require "process_settings.php";
set_error_handler('exceptions_error_handler');
$log = new EmonLogger(__FILE__);
//$log->set( $settings['log']['location']."/emoncms.log", 0);
$log->info("Starting MQTT Input script");
if (!$settings["mqtt"]["enabled"]) {
//echo "Error MQTT input script: MQTT must be enabled in settings.php\n";
$log->error("MQTT must be enabled in settings.php");
die;
}
$retry = 0;
$mysqli_connected = false;
while(!$mysqli_connected) {
// Try to connect to mysql
$mysqli = @new mysqli(
$settings["sql"]["server"],
$settings["sql"]["username"],
$settings["sql"]["password"],
$settings["sql"]["database"],
$settings["sql"]["port"]
);
if ($mysqli->connect_error) {
$log->error("Cannot connect to MYSQL database:". $mysqli->connect_error);
$retry ++;
if ($retry>3) die;
sleep(5.0);
} else {
$mysqli_connected = true;
break;
}
}
// Enable for testing
// $mysqli->query("SET interactive_timeout=60;");
// $mysqli->query("SET wait_timeout=60;");
if ($settings['redis']['enabled']) {
$redis = new Redis();
if (!$redis->connect($settings['redis']['host'], $settings['redis']['port'])) {
$log->error("Cannot connect to redis at ".$settings['redis']['host'].":".$settings['redis']['port']); die('Check log\n');
}
if (!empty($settings['redis']['prefix'])) $redis->setOption(Redis::OPT_PREFIX, $settings['redis']['prefix']);
if (!empty($settings['redis']['auth'])) {
if (!$redis->auth($settings['redis']['auth'])) {
$log->error("Cannot connect to redis at ".$settings['redis']['host'].", autentication failed"); die('Check log\n');
}
}
} else {
$redis = false;
}
require("Modules/user/user_model.php");
$user = new User($mysqli,$redis,null);
require_once "Modules/feed/feed_model.php";
$feed = new Feed($mysqli,$redis,$settings['feed']);
require_once "Modules/input/input_model.php";
$input = new Input($mysqli,$redis,$feed);
require_once "Modules/process/process_model.php";
$process = new Process($mysqli,$input,$feed,'UTC');
$device = false;
if (file_exists("Modules/device/device_model.php")) {
require_once "Modules/device/device_model.php";
$device = new Device($mysqli,$redis);
}
/*
new Mosquitto\Client($id,$cleanSession)
$id (string) – The client ID. If omitted or null, one will be generated at random.
$cleanSession (boolean) – Set to true to instruct the broker to clean all messages and subscriptions on disconnect. Must be true if the $id parameter is null.
*/
$mqtt_client = new Mosquitto\Client($settings['mqtt']['client_id'],true);
$connected = false;
$subscribed = 0;
$last_retry = 0;
$last_heartbeat = time();
$count = 0;
$pub_count = 0; // used to reduce load relating to checking for messages to be published
$mqtt_client->onConnect('connect');
$mqtt_client->onDisconnect('disconnect');
$mqtt_client->onSubscribe('subscribe');
$mqtt_client->onMessage('message');
// Option 1: extend on this:
while(true){
try {
$mqtt_client->loop();
} catch (Exception $e) {
if ($connected) $log->error($e);
}
if (!$connected && (time()-$last_retry)>5.0) {
$subscribed = 0;
$last_retry = time();
try {
// SUBSCRIBE
$log->warn("Not connected, retrying connection");
$mqtt_client->setCredentials($settings['mqtt']['user'],$settings['mqtt']['password']);
$mqtt_client->connect($settings['mqtt']['host'], $settings['mqtt']['port'], 5);
// moved subscribe to onConnect callback
} catch (Exception $e) {
$log->error($e);
$subscribed = 0;
}
}
// PUBLISH
// loop through all queued items in redis
if ($connected && $pub_count>10) {
$pub_count = 0;
$publish_to_mqtt = $redis->hgetall("publish_to_mqtt");
foreach ($publish_to_mqtt as $topic=>$value) {
$redis->hdel("publish_to_mqtt",$topic);
$mqtt_client->publish($topic, $value);
}
// Queue option
$queue_topic = 'mqtt-pub-queue';
for ($i=0; $i<$redis->llen($queue_topic); $i++) {
if ($connected && $data = filter_var_array(json_decode($redis->lpop($queue_topic), true))) {
$mqtt_client->publish($data['topic'], json_encode(array("time"=>$data['time'],"value"=>$data['value'])));
}
}
}
$pub_count++;
if ((time()-$last_heartbeat)>300) {
$last_heartbeat = time();
$log->info("$count Messages processed in last 5 minutes");
$count = 0;
// Keep mysql connection open with periodic ping
if (!$mysqli->ping()) {
$log->warn("mysql ping false");
die;
}
}
usleep(10000);
}
function connect($r, $message) {
global $log, $connected, $settings, $mqtt_client, $subscribed;
//echo "Connected to MQTT server with code {$r} and message {$message}\n";
$log->warn("Connecting to MQTT server: {$message}: code: {$r}");
if( $r==0 ) {
// if CONACK is zero
$connected = true;
if ($subscribed==0) {
// Topic is basetopic/username/nodename/devicename/tele/
$topic = $settings['mqtt']['basetopic']."/+/+/+/tele/SENSOR";
$subscribed = $mqtt_client->subscribe($topic,2);
$log->info("Subscribed to: ".$topic." ID - ".$subscribed);
//print("Subscribed to: ".$topic." ID : ".$subscribed."\n");
}
} else {
$subscribed = 0;
$log->error('unexpected connection problem mqtt server:'.$message);
}
}
function subscribe() {
global $log, $topic;
//echo "Subscribed to topic: ".$topic."\n";
$log->info("Callback subscribed to topic: ".$topic);
}
function unsubscribe() {
global $log, $topic, $subscribed;
//echo "Unsubscribed from topic:".$topic."\n";
$subscribed = 0;
$log->error("Unsubscribed from topic: ".$topic);
}
function disconnect() {
global $connected, $log, $subscribed;
$subscribed = 0;
$connected = false;
//echo "Disconnected cleanly\n";
$log->info("Disconnected cleanly");
}
function message($message)
{
try {
$jsoninput = false;
$topic = $message->topic;
$value = $message->payload;
$time = time();
global $settings, $user, $input, $process, $device, $log, $count;
//remove characters that emoncms topics cannot handle
$topic = str_replace(":","",$topic);
//Check and see if the input is a valid JSON and when decoded is an array. A single number is valid JSON.
$jsondata = json_decode($value);
if ((json_last_error() === JSON_ERROR_NONE) /* && is_array($jsondata)*/) {
// JSON is valid - is it an array
$jsoninput = true;
$log->info("MQTT Valid JSON found ");
//print("MQTT Valid JSON found\n");
$key = "Time-Do-Not-Use";
// If JSON, check to see if there is a time value else set to time now.
if (array_key_exists( $key, $jsondata) ){
$inputtime = $jsondata->$key ;
// validate time
if (is_numeric($inputtime)){
$log->info("Valid time in seconds used ".$inputtime);
$time = (int) $inputtime;
} elseif (is_string($inputtime)){
if (($timestamp = strtotime($inputtime)) === false) {
//If time string is not valid, use system time.
$log->warn("Time string not valid ".$inputtime);
$time = time();
} else {
$log->info("Valid time string used ".$inputtime);
$time = $timestamp;
}
} else {
$log->warn("Time value not valid ".$inputtime);
$time = time();
}
} else {
$log->info("No time element found in JSON - System time used");
$time = time();
}
} else {
$jsoninput = false;
$time = time();
}
$log->info($topic." ".$value);
$count ++;
$inputs = array();
// 1. Filter out basetopic
$topic = str_replace($settings['mqtt']['basetopic']."/","",$topic);
// 2. Split by /
$route = explode("/",$topic);
$route_len = count($route);
// Topic is basetopic/username/nodename/devicename/tele/
//print("Is Json:".$jsoninput." Time:".$time." route:".$route_len."\n");
// Userid or API is first entry
$username = $route[0];
// Looks like API format
//$userid = $user->get_id_from_apikey($userid);
$userid = $user->get_id($username);
// Node id is second entry
$nodeid = $route[1];
//print( "UserName:".$username." ID:".$userid." node:".$nodeid."\n");
// Filter nodeid, pre input create, to avoid duplicate inputs
$nodeid = preg_replace('/[^\p{N}\p{L}_\s\-.]/u','',$nodeid);
$dbinputs = $input->get_inputs($userid);
if ($jsoninput) {
$mykey = "ENERGY";
// If JSON, check to see if there is a time value else set to time now.
if (array_key_exists( $mykey, $jsondata) ){
foreach ($jsondata->$mykey as $key=>$value) {
$inputs[] = array("userid"=>$userid, "time"=>$time, "nodeid"=>$nodeid, "name"=>$key, "value"=>$value);
}
}
}
if (!isset($dbinputs[$nodeid])) {
//print( "no input(".$nodeid.")\n");
$dbinputs[$nodeid] = array();
if ($device && method_exists($device,"create")) {
$device->create($userid,$nodeid,null,null,null);
//print( "device->create(".$userid.", ".$nodeid.")\n");
}
} else {
//print( "input(".$nodeid.") exists\n");
}
//print_r($inputs);
$tmp = array();
foreach ($inputs as $i)
{
$userid = $i['userid'];
$time = $i['time'];
$nodeid = $i['nodeid'];
$name = $i['name'];
$value = $i['value'];
if ($name != "TotalStartTime") {
$process->timezone = $user->get_timezone($userid);
// Filter name, pre input create, to avoid duplicate inputs
$name = preg_replace('/[^\p{N}\p{L}_\s\-.]/u','',$name);
//print( "inputs[] name:".$name." ");
if (!isset($dbinputs[$nodeid][$name])) {
//print( "not found\n");
$inputid = $input->create_input($userid, $nodeid, $name);
//print( "input->create_input(".$userid.", ".$nodeid.", ".$name.")\n");
if (!$inputid) {
$log->warn("error creating input"); die;
}
$dbinputs[$nodeid][$name] = true;
$dbinputs[$nodeid][$name] = array('id'=>$inputid);
$input->set_timevalue($dbinputs[$nodeid][$name]['id'],$time,$value);
} else {
//print( "found\n");
$inputid = $dbinputs[$nodeid][$name]['id'];
if (gettype($value) == 'string') {
if (strlen($value) == 4 ) {
if ($name == 'OPTARIF') {
$value = ord($value);
} else if ($name == 'PTEC') {
$value = ord($value[1]);
} else {
$int = 0;
for ($i=0; $i<4; $i++) {
$int = $int << 8;
$int = $int + ord($value[$i]);
}
$value = $int;
}
} else if (strlen($value) == 1 ) {
$value = ord($value);
}
}
//print( "input->set_timevalue(".$nodeid.", ".$name.", ".$value.") Type:".gettype($value)."\n");
$input->set_timevalue($dbinputs[$nodeid][$name]['id'],$time,$value);
if ($dbinputs[$nodeid][$name]['processList']) {
$tmp[] = array('value'=>$value,'processList'=>$dbinputs[$nodeid][$name]['processList']);
}
}
}
}
//print_r($tmp);
foreach ($tmp as $i) {
$process->input($time,$i['value'],$i['processList']);
//print( "process->input(".$time.", ".$i['value'].", ".$i['processList'].")\n");
}
} catch (Exception $e) {
$log->error($e);
}
}
function exceptions_error_handler($severity, $message, $filename, $lineno) {
if (error_reporting() == 0) {
return;
}
if (error_reporting() & $severity) {
throw new ErrorException($message, 0, $severity, $filename, $lineno);
}
}
Oh, last but not least, to fire this php script in emoncms docker instance
#!/bin/sh
# Reload config in docker container
docker exec emoncms_web_1 bash -c 'echo "killing emoncms_mqtt_api.php" ; pkill php ; sleep 1 ; echo "restart" ; php /var/www/emoncms/emoncms_mqtt_api.php &'