EmonHub Development

The MQTT implementation in the development branch was only a proposed MQTT interfacers implementation, the basic framework is there as a PoC but there is no real processing of the data in either as there was never a proper discussion on the format we were aiming for. This is essentially an incomplete generic mqtt interfacer, but it is bi-directional. With mqtt emonhub doesn’t need to perform a “read” function, it just needs to maintain the mqtt connection and when a message comes in the “on_message” function is called (in this instance all it does is prints a log message), only when publishing a topic does the emonhub code need to call send, hence why it looks like a “send only” interfacer because the interfacers “read” function is redundant for mqtt.

But you are right there is no buffering, that is because at the time I was undecided about whether to add the buffering to the interfacers (doing away with reporters) or to retain the buffered output-only reporters.

Essentially yes, The interfacers have always been bi-directional, the JeeInterfacer has sent out the emonLCD time since day one and it has also configured the rfm2pi using serial output. The reporters have always had buffering and “happened” to also only be one-way.

When putting together the routing code for the experimental version I retained reporters (and included them in the routing) as we din not have any equivalent interfacers yet and my experimental version was compatible with existing emonhub installs with reporters (and will still need to be) so that we could introduce new interfacers that potentially replaced the reporters. However, whilst experimenting with reporter-like interfacers I discovered that there may well be a benefit to keeping the distinction between non-buffered 2 way and buffered one-way interfacers/reporters.

Yes! Although the priority here is that the connection between emonhub and emoncms is as efficient and reliable as possible. Using buffered data and confirmed bulk delivery over MQTT or HTTP(S).

I understand why buffered output may seem unnecessary when you are thinking about MQTT because you only picture emonhub and emoncms on the same machine. My view on the connection between emonhub and emoncms is that it could well be on the same machine but it could also be LAN or WAN connected, the physical location of the components doesn’t need to dictate the connections characteristics, if we make a connection that works regardless of the location of emonhub and emoncms relative to each other it will suit all cases including a local install.

In fact if you really wanted a connection that assumed emoncms and emonhub were on the same machine and that nothing would ever inturupt the connection between them, you could use a non-buffered, QoS1 per key topic (2 way local realtime “status” only) type interfacer.

But I fail to see why we would need to force a situation where a frame of data is split up and published on across several different topics individually only so emoncms can them subscribe to the base topic and process all those individual connections, rather than just passing the complete frame to emoncms in one go, MQTT is supposed to provide a light weight framework, which it fails to do it you are increasing the workload at both ends by forcing per key topics unnecessarily. The per key topic tree can be supplied separately (if required) by emonhub for local consumption.

Plus. making users understand why a local emoncms uses a different interfacer to a remote emoncms would be tricky and supporting 2 methods depending on the location of emoncms is too complex.

Always using a reporter (or reporter-like interfacer) when connecting to any and all emoncms instance(s), regardless of whether it’s mqtt or http(s) is straight forward, very clear and suits all scenarios with one solution.

This was a consideration and possibly even the favorite solution during initial development, but I thought that may be overly complicated and at that time I did not feel it was a decision that was needed at that point in time since I was retaining reporters for compatibility and the framework was more important than any individual interfacer implementations. Since then (over the last 2 years) it has become increasingly obvious they need to be distinctly different. Even if we did implement a “switch” setting I would concider making that a harcoded switch that was the only difference between a “mqtt-local” and “mqtt-buffered” interfacers so it was clear what the differences are, this ongoing discussion I think is testament to the need for that.

I think the best way forward would be to retain reporters (initially at least) to retain backwards compatibility with the original version. Then if the reporters are there, whilst still supporting the existing emon-pi mqtt and http interfacers we can transition emon-pi users to using the reporters (or all emonhub users to reporter-like interfacers) to provide a more streamlined buffered-bulk-confirmed-delivery to emoncms.

I will answer the questions about the “run” “add” and “send” methods in another post as I want to keep discussion about desired functionality separate to the coding, they are different discussions and I want to avoid the structure of the Python functions defining the desired functionality of emonhub, it needs to be the other way around.

Very true!

Yes, In fact I chose the name “interfacers” because the previously named “listeners” suggested they were only one way despite the rfm2pi “listener” transmitting data out over RFM to other nodes.

Previously in OEMG this was further confused by the “OemGatewayRFM2PiListenerRepeater” which was an alternative “listener” (as the serial rfm2pi could only have serial conn) that had an inbuilt “socket” listener so it could receive data via a socket (like a socket interfacer) and transmit over RFM so technically it was a 2-way RFM “listener/transmiter” and socket “listener” rolled in to one.

I chose “interfacers” because the code it refers to interfaces with other stuff, potentially in both directions.

Likewise the original OEMG had “buffers” (rather than “reporters”) and those “buffers” were responsible for both buffering and sending on that buffered data to emoncms. When the transition to emonHub was made the “buffers” were renamed “dispatchers” and I later renamed them to “reporters”, IMO the name “dispatchers” didn’t fit the fact they buffered data, dispatching the data sounded (to me) like it was just casting it off into the WWW without a care, where as “reporter” sounded like it was compiling info and then ensuring that info reached it’s audience. The latter change was less important but I rolled it out at the same time as renaming the “listeners” as “interfacers” which IMO was important.

So,

Back to emonHub. Originally each interfacer(listener) had a run() method which was called for each interfacer instance every time emonhub looped in addition to a separate call to a read() function and any data that was received from any interfacer was then added to each reporter (dispatcher) via it’s add() (as in add to the buffer) function, then still from within emonhub.py’s run() each reporters flush() was called regardless of whether data was present or not. (See here)

When we made the reporters threaded so they didn’t block the serial port comms, it was necisary to have a run() function in the reporter for the threading module to call, this run() was not implemented to be the same as the similarly named run() function in the interfacers. It’s name was a requirement of the threading implementation, the new run() function then called the previously used add() function and also because of the threading, the flush() method had to be called from here too. (See here).

The run() function in the interfacers was there just to action any tasks aside from reading/sending data in each loop (eg transmitting the time in the JeeInterfacer). When I implemented threaded interfacers in the experimantal version I renamed the interfacers run() function action() so as to free up the run() function for threading.

The run() function in the current interfacers basically loops through 3 functions, call read() and add any data found to the rxq (RX queue), call action() (eg send emonGLCD time in Jee interfacer) and IF there’s any data on the txq (TX queue) call the send() function for each Cargo in that queue.

Where as the run() function within the reporters is still the previous incarnation that came about while threading the reporters. IF I were to make the reporter structure align with the interfacers now, it would use the same/similar interfacer run() method and read() would pass as it wouldn’t get subclassed, send() which is essentially “deal with out-bound traffic” would add the data to the reporters buffer after parsing and the action() function would just keep trying to flush the buffer. There would possibly be no reason to have separate add() and flush() functions as they may well be effectively just renamed as send() and action() functions.

Some code does need to be separated out, such as a “send request” function so that any http based reporter (or interacer?) can just call send_request() and a reply is passed back.

So currently, yes the reporter to have a longer string of methods because they have never been updated to the new way of doing things as per the interfacers, the long chain is a result of changes not by specific design.

Further to all this the current run() function in the interfacers (and reporters probably) will probably be undergoing some further change in the near future in relation to the discussion about crashed threads not reporting a traceback to the log and the recent use of @decorators to try and pass the error details out to the main thread.

I’m gonna pause here, for a while at least.

Thanks again for taking the time to write such a detailed set of answers, I prepared the following reply in relation to your first post and have now read your second (thanks again!) which I will reply to in a second post.

I’ve tried to summarise in order to check my understanding and answer in a set of points:

  1. Experimental branch MQTT interfacer is a PoC. Pub format not decided, on_message is a stub, but concept of generic MQTT interfacer is an important point, certain aspects of the generic interfacer can be reused by specific format interfacers: init, connection etc. What should the generic interfacer be? does the reporter-like MQTT interfacer proposed inherit the generic interfacer? and just overwrite send with a blank method perhaps? Perhaps we should come back to this question in future.

  2. Implement a Bulk mode MQTT reporter-like interfacer (reasons understood, emoncms can be on WAN, QoS2, bandwidth efficiency). Im happy with the reasons for this and working towards this as the main way to send data to emoncms.

  3. Keep a per topic MQTT interfacer QoS1 as an option where users need it, but switch to the buffered QoS2 reporter-like interfacer for the emoncms coms.

  4. A QoS1/QoS2 unbuffered/buffered switch while possible may not make a lot of sense, in the MQTT example the format emon/node/inputname:value cannot be switched into QoS2 buffered mode as there is no time included. A significantly different send or _process_post method is required for the bulk format vs the per topic tree format.

  5. We still have an ongoing question as to whether the long term direction is to have reporters as a seperate thing from interfacers or whether to have reporter-like interfacers. A reporter like interfacer can re-use the interfacer code in emonhub.py, loading, starting of the interfacer thread and calling of run. A reporter-like interfacer is just an interfacer that implements a buffer achieving QoS2 with transmitted time and proof of receipt. Im siding towards the reporter-like interfacer solution, with the option for any interfacer to implement the buffered QoS2 if required, there’s not a huge amount of difference the benefit is probably just the avoidance of about 20 lines of reporter specific code in emonhub.py. Im not sure that the seperate reporters is moore understandable than reporter-like, considering I completely missed the intention of reporters up to this point as being QoS2 & buffered. It might be better to highlight to prospective developers that there are two ‘output’ modes: un-buffered QoS1 and buffered QoS2 and that they can re-use inherited buffering code implemented in the base class for this rather than writting their own.

  6. We have a question about backwards compatibility, maintaining development branch reporter support. But then for emon-pi varient users backwards compatibility would require the same EmonHubEmoncmsHTTPInterfacer which is a reporter-like interfacer. One possibility here is that we maintain backwards compatibility by translating the reporters entry in emonhub.conf to point to the EmonHubEmoncmsHTTPInterfacer reporter-like interfacer.

  7. We have an ongoing question about interfacer/reporter-like naming.

  8. EmonHubMqttInterfacer: emon/node/inputname:value format?

  9. EmonHubEmoncmsMqttInterfacer: bulk mode QoS2 format?

We could remove the EmonHub and Interfacer parts as they are repeated and are nested in the configuration file, the main challenge of changing naming is backwards compatibility but not insurmountable.

First thankyou for going through the history of the changes and why things are the way they are now, thats very useful for context.

This sounds like a good approach, happy for that.

As an interim step for existing emon-pi variant users as I think we’ve still got a lot of work to do to implement all of the above , I’ve merged in my emonhub_buffer branch changes into the emon-pi branch, at least it fixes the http buffer issue where the interfacer would attempt to upload the entire buffer in a single request (something im quite worried about as Ive been seeing large spikes on emoncms.org for a number of months) and it reverts a whole load of code to be closer to the original experimental branch implementation.

Note to readers: The interim update post can be found here

  1. Yes,
  2. Great
  3. Yes.
  4. Agreed.
  5. We do need to give this considerably more thought, luckily we do not need to make a decision right now. A couple of further points to consider here. We can easily remove the code such as send_request, connect_serial and on_message etc etc to a emonhub_common.py file if we decide to retain both interfacers and reporters and want to share code, sharing elements of code needn’t be the reason to lump them all in together and also we need to decide about potential blocking from trying to post a huge buffered backlog, does a reporter-like interfacer even have a read function? If it doesn’t, is it really a proper interfacer?
    [edit] a Third point is that the buffering may well be expanded to use databases or write to disk so the differences between interfacers and reporter-like interfacers may even become greater. Perhaps the buffer code should be unique to reporters only.
  6. Much of this depends on the outcome of the above, something else to note here is that emon-pi users will undoutable update via the emonPi update script so it is possible to make regex changes to the emonhub.conf file at the same time as pulling in different code so changes in interfacer/reporter name will not require a manual change or user involvement/understanding, although any changes will need to be well documented as there will be a lot of posts and guides that refer to older settings, this latter part is unavoidable regardless of how we apprach this as there will be changes, regardless of the level of backwards compatibility
  7. We do
  8. Although we have a fairly clean sheet here, it is important to get right, and my current thoughts are to match the functionality of the serial and socket interfacers where a simple string of values are published or subscribed to on a common base topic with pub and sub topic extensions, all 3 being defined in the conf, with perhaps a switch to use the last topic level and first value in a frame as a nodeid/name to allow some separation, we will need to try/test a couple of things here, the key is to keep it basic and generic, we can always add another mqtt interfacer to do something particular, but a complex generic interfacer could block some implementations, this is the current situation with the “emonhubMQTTinterfacer” in the emon-pi variant.
  9. Essentially yes BUT! There remains the questions about what it is actually called in the end and whether it is a interfacer, a reporter or a reporter-like interfacer to be thrashed out.

As for the release of an interim step, I’m not sure I would go that route, but I am not going to debate or even think about it too much as I’m sure your mind is set.

Thanks Paul, great, I think we’re making progress with understanding the scope of the work required and compiling a kind of todo list of things to work on here. We’ve outlined both significant changes to emoncms (indexed inputs, mqtt bulk format) and emonhub. What do you think is the next step? How do we break this down into manageable chunks?

Not entirely sure but I do know I need to get into the code, currently, between working, the forum and these really long discussions, I am not finding any time for emonhub coding, so for me the priority is to set up a test environment and start playing, re familiarizing myself with the code based on what we have so far.

I’m primarily working from memory at the moment so perhaps once I get rolling, some answers to the outstanding questions might start flowing.

You are right there are the changes to emoncms to consider as well, it would be good to get a “emonhub” branch set up so we can start on those changes but NOT chamnge anything in the master/stable branches until we get both emonhub and emoncms’s emonhub branch to a place we are happy with, we do not want to make rash changes to emoncms that then dictate a less than agreeable implementation in emonhub, however it would be very useful to have the changes there in emoncms “emonhub” branch when we get to test emonhub.

Regarding the emoncms “mqtt” api, what are your thoughts on basically making “mqtt” a parallel route to similar/same api calls?

eg the bulk upload of mqtt would be to /emoncms/input/bulk and the payload would be [[ts,id,v1,v2,v3],[ts,id,v1,v2,v3]] authentication and identification are something we need to think about too, I would like to see the apikey in there somewhere, I’m guessing it should be in the payload so it isn’t exposed to unauthorized users (that do have access to the mqtt server) although a apikey or username or userid in the topic tree would then allow for an ACF with the apikey or password granting access to that one branch. I know this isn’t needed for the emonpi install but I bring it up now so that we might move closer to a multi-user mqtt implementation in the future, currently there can only be one mqtt account per emoncms server.

I would also like to add a emoncms http interfacer (non-reporter-like) that uses the “fetch” api so we can publish emoncms feed data via emonhub rather than using the “connect send and die” publish to mqtt process. Could we also do this via mqtt? if we mimic the http api’s in mqtt, in theory we can use the fetch api, but instead of “relpying” to the calling http request it could trigger a publish to topic (eg emoncms/outbound/username/reference) where the “reference” is a unique reference for this query that emonhub supplied and can identify to (thinking out loud a bit here I know what I’m after, but not the best way to do it yet). This would allow users to use emonHub to

Regards the “bulk” reporters or reporter-like interfacers, are we agreed that they will use indexed CSV format to minimize RAM used for buffering and bandwidth used for passing the payload to emoncms? Do we therefore want to look at updating emoncms input names with a api call on changes detected to the node/input names in emonhub.conf (and restart of emonhub)?

great, that sounds good.

I will have a think about the emoncms changes and the topic/payload formats you mention and the apikey question.

The fetch via mqtt should be possible but I think quite complex to implement, not sure will have to think about that as well.

yes agreed

yes i guess so… thats a complex action to add


Il try and summarise all of our points in a list so that we can refer back to it as we work on it. It might be good to know which one of us is going to work on each point or whether we both need to explore a point and come back to it once we’ve both had a chance to test ideas. It looks like a big job, how much time do we need to give ourselves to do the job properly? it looks like it will take much longer than a couple of weeks!! especially given other commitments

Not necessarily an immediate concern, but just something to bear in mind whilst structuring the more pressing stuff as something we may want down the line. We can use the http “fetch” api, I just assume that at some point someone will want a fully mqtt setup, it seems daft to go to so much effort to ensure emonhub is posting to emoncms by mqtt not http, and then need to use http for something else.

Maybe, but we maybe able to look at expanding the bulk api to help emonhub with this task, maybe change the api to accept text input values as names rather than rejecting all non-numerical data (eg [[ts,id,v1,v2,v3],[ts,id,“power1”,“power2”,“Power3”][ts,id,v1,v2,v3]]). this way emonhub can just buffer the name changes just as it would any values.

Yes I guessed it would be a significant amount of work, I really have no idea at this point how long it might take. But I do know it will be slow getting started due to needing to familiarize myself with the code again. I doubt much will progress that far at all in 2 weeks, but once we’re rolling it will get quicker, just as well really as I’m sure more work will become apparent as we progress. I already had a snagging list for the experimental version when you released the emon-pi version. I will need to find those notes too.

Hello Paul, apologies for the delay with this. Reading through the above again here is my attempt at summarising the key items raised for our ongoing reference.

  • Establish our approach: interfacers, reporters or reporter-like interfacers
  • Establish our approach: seperate interfacer files vs core interfacer file.
  • Review EmonHub internal message queue implementation
  • Consider process chain, standardise on correct approach for use of run, read, send, action, add, flush, process_post.
  • Consider interfacer naming
  • Buffer persistance: Option to save or load a buffer from disk?
  • Review rx & tx node definition in emonhub.conf used by emon-pi variant
  • Document and explain distinction between QoS1/QoS2 unbuffered/buffered interfacers/reporters.
  • Document further use of socket interfacer
  • Consider backwards compatibility when switching from reporters to interfacers or vice versa.
  • MQTT: What should the format of the generic MQTT Interfacer be?
  • MQTT: Specific format MQTT Interfacers can re-use the generic interfacer
  • MQTT: Keep a per-topic MQTT Interfacer QoS1 unbuffered
  • MQTT: how do we handle on_message case
  • MQTT: Implement a bulk mode, buffered QoS2 MQTT reporter-like interfacer
  • MQTT: Pass MQTT data through emonhub from ESP devices
  • EmonHub HTTP interfacer that can call the emoncms fetch api to retrieve data into emonhub
  • Emoncms changes: Implement the input/post and input/bulk formats in MQTT, perhaps extend to other api’s e.g fetch.
  • Emoncms changes: option to have indexed inputs, and seperate sending of inputnames.
  • Emoncms changes: support multiple emoncms accounts from the mqtt_input script.
  • Emoncms: review timestamp processing
  • Review threaded exception handling
  • Review error handling in core interfacers
  • systemd service unit
  • simultaneous tx of settings resulting in potential serial crash
  • Review list and single word settings in emon-pi variant

From this my next emonhub related step will be to investigate the indexed inputs option for emoncms and the possibility of sending an inputnames parameter.

You mentioned above a wish to find some time to familiarise yourself with the code. Let me know when you would like my input again.

No problems at all, It feels like I’ve been dragging my feet a bit. although I have been spending some time on emonhub over the last week or 2.

I have been looking closer at emonhub and trying to “get into it” again, but that hasn’t been easy, I seemed to have lost a few repo’s and branches. Stuff I have explored, partially fixed or already made progress with previously doesn’t seem to be to hand, not sure whether to spend time looking for it or just start over.

The list is pretty complete, there are other things that we haven’t yet discussed (on my hit list) and there are some things like the threaded exception handling that need looking at, at the very least I would like to see the @decorator replaced with a better implementation, but I’m not convinced it’s working as expected. There are posts that show the thread is dead issue after the fix was released and there are logs that show frequent restarting of the threads without any tracebacks.

Overall the error handling needs to improve, I can improve the common code but the read and write functions will be the responsibility of each interfacer, I would like to try and make the core as robust as possible and reduce the checking needed in each interfacer if possible.

Use of textual names throughout emonhub is something I would like to see implemented sooner rather than later.

I’m not sure I understand why scales and datapoints have been removed from the cargo object whilst names and node name have been added.

We also need to look at creating a systemd service unit.

and the way serial commands are sent from the main thread when changing Jee Settings could cause a crash if 2 threads are accessing the same serial port at the same time.

The way lists and single word settings are handled in the emon-pi variant need addressing to, it is not necessary to use a trailing comma if implemented correctly. I have also found a better way of handing boolean settings.

Hopefully the pace will pick up a bit as I get more familiar with the code again.

Can you also look at the timestamp processing?

ref Best way to guarantee times when using BULK load with PHPTIMESERIES? - #2 by pb66

Personally I would like to see the legacy mode updated to be “absolute” so that omitting a “mode” means the supplied timestamps are used without adjustment. The only alternative would seem to be adding “&time=0” to the end of each request (minor, but annoyingly pointless if we are trying to make the bulk upload as minimal as possible). The current “&sentat=” is open to potential errors.

perhaps we could change these lines

        // Legacy mode: input/bulk.json?data=[[0,16,1137],[2,17,1437,3164],[4,19,1412,3077]]
        else {
            $time_ref = time() - (int) $data[$len-1][0];
        }

to something like

        // Legacy mode: input/bulk.json?data=[[0,16,1137],[2,17,1437,3164],[4,19,1412,3077]]
        elseif ((int) $data[$len-1][0] <1000000000);
        {
            $time_ref = time() - (int) $data[$len-1][0];
        }
        // New default "absolute" timestamp mode: input/bulk.json?data=[[1519749672,16,1137],[1519749682,16,1437,3164],[1519749692,16,1412,3077]]
        else
        {
             $time_ref = 0;
        }

by checking if the timestamp of the last packet is in the range of a complete unix timestamp or just an offset, we could set the timeref to 0 if absolute timestamps are used.

Thanks Paul, Great, I’ve added the additional items you have highlighted to the list including the timestamp processing.

Note this thread about emonLCD and mqtt.

We will also need to think about a “Nodes legacy mode” mqtt by frame in the QoS1 mqtt interfacer.

Also we should create a logrotate entry to put in /etc/logrotate.d/emonhub so that emonhub.log files are not rotated out so frequently. We could concider reducing the size of the files from 5mb to 3mb if a potential 10mb is too much, but we should let emonhub manage it’s own log files and only use logrotate to compress emonhub.log.1.

Revise JeeLib confirmation messages ref “confirmed sent packet size: → ack” in Inputs Nodes Value only rssi Error after update of Emoncms version low-write 9.8.28 - #16 by pb66.

Also need to re-implement or fix the ability for the emonhub.log to display the emonPi/rfm69pi firmware revision again.

@TrystanLea - I would like to invite @beaylott to this discussion, do you have any issues with that?