MQTT to specific user

Hello,

If we check the documentation (https://github.com/emoncms/emoncms/blob/master/docs/RaspberryPi/MQTT.md#node-format), we can see that is possible to use MQTT to send data to a specific user in EmonCMS.

#### emoncms as a subscriber

[basetopic] and user ID of the target Emoncms account can be set in settings.php. Default basetopic = emon, which means Emoncms will subscribe to emon/# where # is any higher level topics.

In my case, I have a local EmonCMS server with multiuser enabled (currently 3 accounts). Each account have its clients posting data using Input API and specifying the Write API key on their URI. That’s how each one have their stuff separated from others.

However, when using MQTT, the data is send to the first account which is the admin one.

How can I do to specify the user id in the basetopic to send a specific info via MQTT to a specific user inside EmonCMS.

Would be great to have at least one example in the documentation.

Thanks in advance!
Xavier.

The PHP version is depreceated for the ini version of settings.

I’m not sure that works anymore.

@TrystanLea will have to comment.

Where is the data coming from? I think you can do this via the HTTP API.

Hello @xavidpr4 this is not currently available Im afraid. Did you want to have mqtt authentication on a per user basis os just a simple direction to each account with the default mqtt username and password?

I cant quite remember now why the part that may have introduced this option is now commented out, see the emoncms_mqtt script here: emoncms/scripts/services/emoncms_mqtt/emoncms_mqtt.php at a44157b8caec8e0f1054dd015a554740ba8a41b6 · emoncms/emoncms · GitHub

There’s also the mqttauth branch of emoncms which introduces a more complete mqtt auth approach but while I’ve been able to update the branch to the latest changes in the master branch, I havent tried this branch for a while GitHub - emoncms/emoncms at mqttauth. It’s probably best to stick to the HTTP API for now.

Hello @TrystanLea and @borpin thanks for your clarification.
I’m not trying to use MQTT credentials for each user or perform MQTT authentication. For me is fine if we have a global MQTT credentials across all the users, however, what i’m looking for is a way that a specific user inside my EmonCMS instance can use MQTT to push content into its user space.

Something like ‘emon/[userid or username or writeAPIkey]/device/sensor’

Right now any user can push content to its user space using the HTTP API, but if they try to use MQTT, the data is received in the input tab of the admin account which is the first account.

The reason to use MQTT is because I have a local EmonCMS instance with 3 users (where my account is the admin) and the other two users want to use Tasmota flashed devices to push the telemetry to EmonCMS, but if they do, the content is sent to my account.

Do you think that this is posible somehow? As far as I see, seems that at some point, it was posible to specify the user in the basetopic of the MQTT to indicate to which user belongs the MQTT message.

Thanks!
Xavier.

Thanks @xavidpr4 all I can say is that it is not possible at the moment. Id have to look at this in a bit more detail to see if its something that could be reintroduced in a simple way as you describe, you are probably best sticking to the http api for now.

Ok, I think i’ll create a script which would be subscribed to the topic tasmota/# and I will request to users to configure their tasmota devices to publish with the topic tasmota/writeAPIkey/sensor…

This way I can identify each user device and convert it to HTTP API.

Anyway thanks a lot @TrystanLea, you have a very good project here!!
Thanks

1 Like

Search for WEBSEND in the Tasmota Docs. Commands - Tasmota

Pretty sure you can use this and a Tasmota Rule.

1 Like

Thanks @xavidpr4 !

@xavidpr4 I already discussed about this point (and my data is coming from Tasmota for me, funny @borpin we use same things) in this topic

I solved all of this, kept security feature using mqtt authentification and all is working fine.

First of all here (in Tasmota but could be anything else) look at the MQTT config below

image

I select user/password, the user name should match the emoncms username (not user ID) and the password should match the user read api key. In this example user is hallard node name (emoncms view) is maison
Then to be able to publish to user area the topic should match (because I use tasmota and it’s Tasmota format) but can be changed later below in php script

topicbase/username/nodename/device_name/tele/SENSOR

Here MQTT explorer view and emoncms inputs for better understanding

Ok basic are show, let’s go deeper. For this to works you need to set authentication with Mosquitto and using password file (out of this scope). The idea is to get emoncms user name/password from database get in sync with MQTT user password file.

Emoncms and MQTT are running into docker (each one has it own instance) that may complexify scripts (because they need to be running into container) but here the deal. Note that mosquitto config is binded onty local disk file so I can access easily from the host

here sync_password.sh that generate a flat text file containing a text file with username/read_api_key connecting emoncms container and execute a mysql query.

#!/bin/sh
# Extract Emoncms Users
docker exec -it emoncms_db_1 mysql -umysql_user_name -pmysql_password -Demoncms  -NBr -e "SELECT username, apikey_read FROM users" | sed 's/\t/:/g' >~/docker/mosquitto/data/config/passwd.emon

I have only few user for now so I’m creating them manually once extracted but it’s easy to make another script for that (in that case may be better to do this one in mosquitto container instance)

So, look at the text file ~/docker/mosquitto/data/config/passwd.emon grab user name and api key then fire a command to update mosquitto password file (data/config/passwd in my case)

mosquitto_passwd -b ~/docker/mosquitto/data/config/passwd username 1f29ff88397f32f374771ccc57037a2d

Then you need to send signal to mosquitto instance to reload config (new password) without killing existing sessions (reload_config script)

#!/bin/sh

# Reload config in docker container named mosquitto
docker exec -it mosquitto pkill -HUP mosquitto

Ok we’re almost done, now I created a new php called emoncms_mqtt_api.php mainly copied from original emoncms_mqtt.php

What I changed of course is the topic to listen according what’s defined above by tasmota (in my case basetopic is emoncms)

$topic = $settings['mqtt']['basetopic']."/+/+/+/tele/SENSOR";

Then parsing the message followed these changes


        $inputs = array();
            
            // 1. Filter out basetopic
            $topic = str_replace($settings['mqtt']['basetopic']."/","",$topic);
            // 2. Split by /
            $route = explode("/",$topic);
            $route_len = count($route);

            // Topic is basetopic/username/nodename/devicename/tele/
            //print("Is Json:".$jsoninput."  Time:".$time."  route:".$route_len."\n");
            
            // Userid or API is first entry
            $username = $route[0];

            // Looks like API format
            $userid = $user->get_id($username);

            // Node id is second entry
            $nodeid = $route[1];

            // Filter nodeid, pre input create, to avoid duplicate inputs
            $nodeid = preg_replace('/[^\p{N}\p{L}_\s\-.]/u','',$nodeid);
            
            $dbinputs = $input->get_inputs($userid);

            if ($jsoninput) {
                $mykey = "ENERGY";
                // If JSON, check to see if there is a time value else set to time now.
                if (array_key_exists( $mykey, $jsondata) ){
                    foreach ($jsondata->$mykey as $key=>$value) {
                        $inputs[] = array("userid"=>$userid, "time"=>$time, "nodeid"=>$nodeid, "name"=>$key, "value"=>$value);
                    }
                }
            }

And voila, I know it’s quick and mainly missing documentation but it works. Ideally what would be awesome

  • that emoncms could fire a command on user creation (to extract username/api key) and sync mosquitto it would be perfect
  • that this modified emoncms_mqtt_api.php script be part of emoncms (can be renamed)

Here the full script emoncms_mqtt_api.php

<?php

    /*
    
    **MQTT input interface script**
    
    SERVICE INSTALL INSTRUCTIONS:
    https://github.com/emoncms/emoncms/blob/master/docs/RaspberryPi/MQTT.md
    
    EXAMPLES:
    
    create an input from emonTx node called power with value 10:
        [basetopic]/emontx/power 10
    
    create an input from node 10 called power with value 10 :
        [basetopic]/10/power 10
        
    create input from emontx with key 0 of value 10
        [basetopic]/emontx 10
        
    create input from emontx with key 0 of value 10, key 1 of value 11 and key 2 of value 11
        [basetopic]/emontx 10,11,12

    * [basetopic] and user ID of target Emoncms account can be set in settings.php
    
    Emoncms then processes these inputs in the same way as they would be
    if sent to the HTTP Api.
    
    */

    // This code is released under the GNU Affero General Public License.
    // OpenEnergyMonitor project:
    // http://openenergymonitor.org
    
    define('EMONCMS_EXEC', 1);

    $fp = fopen("/var/lock/emoncms_mqtt.lock", "w");
    if (! flock($fp, LOCK_EX | LOCK_NB)) { echo "Already running\n"; die; }
    
    chdir(dirname(__FILE__));
    require "Lib/EmonLogger.php";
    require "process_settings.php";
    
    set_error_handler('exceptions_error_handler');
    $log = new EmonLogger(__FILE__);
    //$log->set( $settings['log']['location']."/emoncms.log", 0);
    $log->info("Starting MQTT Input script");
    
    if (!$settings["mqtt"]["enabled"]) {
        //echo "Error MQTT input script: MQTT must be enabled in settings.php\n";
        $log->error("MQTT must be enabled in settings.php");
        die;
    }
    
    $retry = 0;
    $mysqli_connected = false;
    while(!$mysqli_connected) {
        // Try to connect to mysql
        $mysqli = @new mysqli(
            $settings["sql"]["server"],
            $settings["sql"]["username"],
            $settings["sql"]["password"],
            $settings["sql"]["database"],
            $settings["sql"]["port"]
        );
        
        if ($mysqli->connect_error) { 
            $log->error("Cannot connect to MYSQL database:". $mysqli->connect_error);  
            $retry ++;
            if ($retry>3) die;
            sleep(5.0);
        } else {
            $mysqli_connected = true;
            break;
        }
    }
    
    // Enable for testing
    // $mysqli->query("SET interactive_timeout=60;");
    // $mysqli->query("SET wait_timeout=60;");

    if ($settings['redis']['enabled']) {
        $redis = new Redis();
        if (!$redis->connect($settings['redis']['host'], $settings['redis']['port'])) {
            $log->error("Cannot connect to redis at ".$settings['redis']['host'].":".$settings['redis']['port']);  die('Check log\n');
        }
        if (!empty($settings['redis']['prefix'])) $redis->setOption(Redis::OPT_PREFIX, $settings['redis']['prefix']);
        if (!empty($settings['redis']['auth'])) {
            if (!$redis->auth($settings['redis']['auth'])) {
                $log->error("Cannot connect to redis at ".$settings['redis']['host'].", autentication failed"); die('Check log\n');
            }
        }
    } else {
        $redis = false;
    }
    
    require("Modules/user/user_model.php");
    $user = new User($mysqli,$redis,null);
    
    require_once "Modules/feed/feed_model.php";
    $feed = new Feed($mysqli,$redis,$settings['feed']);

    require_once "Modules/input/input_model.php";
    $input = new Input($mysqli,$redis,$feed);

    require_once "Modules/process/process_model.php";
    $process = new Process($mysqli,$input,$feed,'UTC');

    $device = false;
    if (file_exists("Modules/device/device_model.php")) {
        require_once "Modules/device/device_model.php";
        $device = new Device($mysqli,$redis);
    }
    /*
        new Mosquitto\Client($id,$cleanSession)
        $id (string) – The client ID. If omitted or null, one will be generated at random.
        $cleanSession (boolean) – Set to true to instruct the broker to clean all messages and subscriptions on disconnect. Must be true if the $id parameter is null.
    */ 
    $mqtt_client = new Mosquitto\Client($settings['mqtt']['client_id'],true);
    
    $connected = false;
    $subscribed = 0;
    $last_retry = 0;
    $last_heartbeat = time();
    $count = 0;
    $pub_count = 0; // used to reduce load relating to checking for messages to be published
    
    $mqtt_client->onConnect('connect');
    $mqtt_client->onDisconnect('disconnect');
    $mqtt_client->onSubscribe('subscribe');
    $mqtt_client->onMessage('message');

    // Option 1: extend on this:
    while(true){
        try {
            $mqtt_client->loop();
        } catch (Exception $e) {
            if ($connected) $log->error($e);
        }

        if (!$connected && (time()-$last_retry)>5.0) {
            $subscribed = 0;
            $last_retry = time();
            try {
                // SUBSCRIBE
                $log->warn("Not connected, retrying connection");
                $mqtt_client->setCredentials($settings['mqtt']['user'],$settings['mqtt']['password']);
                $mqtt_client->connect($settings['mqtt']['host'], $settings['mqtt']['port'], 5);
                // moved subscribe to onConnect callback

            } catch (Exception $e) {
                $log->error($e);
                $subscribed = 0;
            }
        }

        // PUBLISH
        // loop through all queued items in redis
        if ($connected && $pub_count>10) {
            $pub_count = 0;
            $publish_to_mqtt = $redis->hgetall("publish_to_mqtt");
            foreach ($publish_to_mqtt as $topic=>$value) {
                $redis->hdel("publish_to_mqtt",$topic);
                $mqtt_client->publish($topic, $value);
            }
            // Queue option
            $queue_topic = 'mqtt-pub-queue';
            for ($i=0; $i<$redis->llen($queue_topic); $i++) {
                if ($connected && $data = filter_var_array(json_decode($redis->lpop($queue_topic), true))) {
                    $mqtt_client->publish($data['topic'], json_encode(array("time"=>$data['time'],"value"=>$data['value'])));
                }
            }
        }
        $pub_count++;

        if ((time()-$last_heartbeat)>300) {
            $last_heartbeat = time();
            $log->info("$count Messages processed in last 5 minutes");
            $count = 0;

            // Keep mysql connection open with periodic ping
            if (!$mysqli->ping()) {
                $log->warn("mysql ping false");
                die;
            }
        }

        usleep(10000);
    }

    function connect($r, $message) {
        global $log, $connected, $settings, $mqtt_client, $subscribed;
        //echo "Connected to MQTT server with code {$r} and message {$message}\n";
        $log->warn("Connecting to MQTT server: {$message}: code: {$r}");
        if( $r==0 ) {
            // if CONACK is zero 
            $connected = true;
            if ($subscribed==0) {
                // Topic is basetopic/username/nodename/devicename/tele/
                $topic = $settings['mqtt']['basetopic']."/+/+/+/tele/SENSOR";
                $subscribed = $mqtt_client->subscribe($topic,2);
                $log->info("Subscribed to: ".$topic." ID - ".$subscribed);
                //print("Subscribed to: ".$topic." ID : ".$subscribed."\n");
            }
        } else {
            $subscribed = 0;
            $log->error('unexpected connection problem mqtt server:'.$message);
        }
    }

    function subscribe() {
        global $log, $topic;
        //echo "Subscribed to topic: ".$topic."\n";
        $log->info("Callback subscribed to topic: ".$topic);
    }

    function unsubscribe() {
        global $log, $topic, $subscribed;
        //echo "Unsubscribed from topic:".$topic."\n";
        $subscribed = 0;
        $log->error("Unsubscribed from topic: ".$topic);
    }

    function disconnect() {
        global $connected, $log, $subscribed;
        $subscribed = 0;
        $connected = false;
        //echo "Disconnected cleanly\n";
        $log->info("Disconnected cleanly");
    }

    function message($message)
    {
        try {
            $jsoninput = false;
            $topic = $message->topic;
            $value = $message->payload;
            
            $time = time();

            global $settings, $user, $input, $process, $device, $log, $count;

            //remove characters that emoncms topics cannot handle
            $topic = str_replace(":","",$topic);

            //Check and see if the input is a valid JSON and when decoded is an array. A single number is valid JSON.
            $jsondata = json_decode($value);
            if ((json_last_error() === JSON_ERROR_NONE) /* && is_array($jsondata)*/) {
                // JSON is valid - is it an array
                $jsoninput = true;
                $log->info("MQTT Valid JSON found ");
                //print("MQTT Valid JSON found\n");

                $key = "Time-Do-Not-Use";

                // If JSON, check to see if there is a time value else set to time now.
                if (array_key_exists( $key, $jsondata) ){
                    $inputtime = $jsondata->$key ;

                    // validate time
                    if (is_numeric($inputtime)){
                        $log->info("Valid time in seconds used ".$inputtime);
                        $time = (int) $inputtime;
                    } elseif (is_string($inputtime)){
                        if (($timestamp = strtotime($inputtime)) === false) {
                            //If time string is not valid, use system time.
                            $log->warn("Time string not valid ".$inputtime);
                            $time = time();
                        } else {
                            $log->info("Valid time string used ".$inputtime);
                            $time = $timestamp;
                        }
                    } else {
                        $log->warn("Time value not valid ".$inputtime);
                        $time = time();
                    }
                } else {
                    $log->info("No time element found in JSON - System time used");
                    $time = time();
                }
            } else {
                $jsoninput = false;
                $time = time();
            }

            $log->info($topic." ".$value);
            $count ++;
            
            $inputs = array();
            
            // 1. Filter out basetopic
            $topic = str_replace($settings['mqtt']['basetopic']."/","",$topic);
            // 2. Split by /
            $route = explode("/",$topic);
            $route_len = count($route);

            // Topic is basetopic/username/nodename/devicename/tele/
            //print("Is Json:".$jsoninput."  Time:".$time."  route:".$route_len."\n");
            
            // Userid or API is first entry
            $username = $route[0];

            // Looks like API format
            //$userid = $user->get_id_from_apikey($userid);
            $userid = $user->get_id($username);

            // Node id is second entry
            $nodeid = $route[1];

            //print( "UserName:".$username."  ID:".$userid."  node:".$nodeid."\n");

            // Filter nodeid, pre input create, to avoid duplicate inputs
            $nodeid = preg_replace('/[^\p{N}\p{L}_\s\-.]/u','',$nodeid);
            
            $dbinputs = $input->get_inputs($userid);

            if ($jsoninput) {
                $mykey = "ENERGY";
                // If JSON, check to see if there is a time value else set to time now.
                if (array_key_exists( $mykey, $jsondata) ){
                    foreach ($jsondata->$mykey as $key=>$value) {
                        $inputs[] = array("userid"=>$userid, "time"=>$time, "nodeid"=>$nodeid, "name"=>$key, "value"=>$value);
                    }
                }
            }

            if (!isset($dbinputs[$nodeid])) {
                //print( "no input(".$nodeid.")\n");
                $dbinputs[$nodeid] = array();
                if ($device && method_exists($device,"create")) {
                    $device->create($userid,$nodeid,null,null,null);
                    //print( "device->create(".$userid.", ".$nodeid.")\n");
                }
            } else {
                //print( "input(".$nodeid.") exists\n");
            }

            //print_r($inputs);
            $tmp = array();
            foreach ($inputs as $i)
            {
                $userid = $i['userid'];
                $time = $i['time'];
                $nodeid = $i['nodeid'];
                $name = $i['name'];
                $value = $i['value'];

                if ($name != "TotalStartTime") {
                    $process->timezone = $user->get_timezone($userid);
                    // Filter name, pre input create, to avoid duplicate inputs
                    $name = preg_replace('/[^\p{N}\p{L}_\s\-.]/u','',$name);
                    //print( "inputs[] name:".$name." ");
                    
                    if (!isset($dbinputs[$nodeid][$name])) {
                        //print( "not found\n");
                        $inputid = $input->create_input($userid, $nodeid, $name);
                        //print( "input->create_input(".$userid.", ".$nodeid.", ".$name.")\n");
                        if (!$inputid) {
                            $log->warn("error creating input"); die;
                        }
                        $dbinputs[$nodeid][$name] = true;
                        $dbinputs[$nodeid][$name] = array('id'=>$inputid);
                        $input->set_timevalue($dbinputs[$nodeid][$name]['id'],$time,$value);
                    } else {
                        //print( "found\n");
                        $inputid = $dbinputs[$nodeid][$name]['id'];

                        if (gettype($value) == 'string') {
                            if (strlen($value) == 4 ) {
                                if ($name == 'OPTARIF') {
                                    $value = ord($value);
                                } else if ($name == 'PTEC') {
                                    $value = ord($value[1]);
                                } else {
                                    $int = 0;
                                    for ($i=0; $i<4; $i++) {
                                        $int = $int << 8;
                                        $int = $int + ord($value[$i]);
                                    }
                                    $value = $int;
                                }
                            } else if (strlen($value) == 1 ) {
                                $value = ord($value);
                            }
                        }

                        //print( "input->set_timevalue(".$nodeid.", ".$name.", ".$value.") Type:".gettype($value)."\n");
                        $input->set_timevalue($dbinputs[$nodeid][$name]['id'],$time,$value);
                        
                        if ($dbinputs[$nodeid][$name]['processList']) {
                            $tmp[] = array('value'=>$value,'processList'=>$dbinputs[$nodeid][$name]['processList']);
                        }
                    }
                }
            }
            
            //print_r($tmp);
            foreach ($tmp as $i) {
                $process->input($time,$i['value'],$i['processList']);
                //print( "process->input(".$time.", ".$i['value'].", ".$i['processList'].")\n");
            }

        } catch (Exception $e) {
            $log->error($e);
        }
    }
    
    
    function exceptions_error_handler($severity, $message, $filename, $lineno) {
        if (error_reporting() == 0) {
            return;
        }
        if (error_reporting() & $severity) {
            throw new ErrorException($message, 0, $severity, $filename, $lineno);
        }
    }

Oh, last but not least, to fire this php script in emoncms docker instance

#!/bin/sh
# Reload config in docker container
docker exec emoncms_web_1 bash -c 'echo "killing emoncms_mqtt_api.php" ; pkill php ; sleep 1 ; echo "restart" ; php /var/www/emoncms/emoncms_mqtt_api.php &'