Community
OpenEnergyMonitor

OpenEnergyMonitor Community

Way to pause and buffer emonHub output?

I am working on transitioning from using a EmonHubEmoncmsReporter to using the EmonHubMqttInterfacer to communicate with my emonCMS server. One of the things I liked about the EmonHubEmoncmsReporter was the ability to pause and buffer the output. Since my emonHub runs on a separate Raspberry Pi server, this allowed me to do things like restart the emonCMS server and not loose any input data. I would simply uncomment “pause=out” and then comment it out again when emonCMS came back up.

[[Home]]
    Type = EmonHubEmoncmsReporter
    [[[init_settings]]]
    [[[runtimesettings]]]
    #pause = out

Is there any equivalent I could use with the MQTT interfacer?

Thanks!
Brandon

Originally the pause settings were core features and available to all interfacers so it’s worth trying the same setting in the mqtt interfacer.

HOWEVER!

The MQTT interfacer didn’t previously send/use timestamps when publishing the data to emoncms, so when you unpause, all the data would flood through and receive very similar “now” timestamps as it arrives at emoncms. So even if pause works, the buffered data is of no use. So (IIRC) Brian has made some changes to add optional timestamps json format, you would need to swap to that to make pause worth while on MQTT.

I’ve never tested the pause feature so it partly depends how quickly the data is sent, how well the broker buffers it and how well the receiving script handles it (lots of variables).

As Paul says, I did add a feature to the MQTT interfacer so the data can be sent as a single message with an optional timestamp, rather than each value to a separate topic. You would need to use this format.

It theory it might work :slight_smile:

I would be interested in trying this. I set up a test server that is a copy of my currently running emonBase. How would I go about installing the change, and is there anything I need to do once installed to switch the live server over to JSON?

Thanks!
Brandon

I started by adding these lines to my emonHub config at the end of the MQTT interfacer section.

 node_JSON_enable = 1
 node_JSON_basetopic = emon/JSON/

Once I restarted emonHub, I now had all my inputs appearing under a “JSON” topic.

For example, in addition to

emonTx1
power1
power2

emonTx2
power1
power2

I now had a JSON topic with all the other topics under it

JSON
emontx1_power1
emontx1_power2

emontx2_power1
emontx2_power2

Since this isn’t what I would want, if I were to transition to this format for live production use, I tried changing the JSON base topic to emon/ and disabling the nodular format.

[[MQTT]]

    Type = EmonHubMqttInterfacer
    [[[init_settings]]]
        mqtt_host = 127.0.0.1
        mqtt_port = 1883
        mqtt_user = emonpi
        mqtt_passwd = emonpimqtt2016

    [[[runtimesettings]]]
        #pause = out
        pubchannels = ToRFM12,
        subchannels = ToEmonCMS,

        # emonhub/rx/10/values format
        # Use with emoncms Nodes module
        node_format_enable = 1
        node_format_basetopic = emonhub/

        # emon/emontx/power1 format - use with Emoncms MQTT input
        # http://github.com/emoncms/emoncms/blob/master/docs/RaspberryPi/MQTT.md
        nodevar_format_enable = 0
        nodevar_format_basetopic = emon/

        node_JSON_enable = 1
        node_JSON_basetopic = emon/

This worked. Now all my inputs were working and logging to feeds as expected, but using the JSON format.

The thing that I could not get working was the “pause = out” function.

I tried saving it right under the runtime settings in the MQTT interfacer

[[[runtimesettings]]]
    pause = out

But this didn’t seemed to pause the inputs from updating. Any suggestions? It looks like the pause setting is still looked for by the MQTT interfacer code.

Brandon

Good to see the modification worked as intended i.e. a drop in replacement :slight_smile:

No it won’t. See this comment from the original code.

MQTT is not generally used to buffer data but used as an ‘overwrite’ type protocol - it does have some buffering builtin, but I have not seen the buffering employed anywhere in anger.

Maybe the built in buffering of MQTT is the way to achieve the “pause” function. It appears that it is able to retain 5000 messages, which is not insignificant.

MQTT restrictions and limitations (ibm.com)

Subscription buffers

Each subscription from either a device or application is allocated a buffer of 5000 messages. The buffer allows for any application or device to fall behind the live data it is processing, and to also build up a backlog of up to 5000 pending messages for each subscription. When the buffer is full, the oldest messages are discarded when a new message is received.

For MQTT version 5.0, use the MQTT clean start and session expiry options to access the subscription buffer. When clean start is set to false and session expiry is set to a nonzero value, the subscriber receives messages from the buffer. When clean start is set to true and session expiry is 0, the buffer is reset.

For MQTT version 3.1.1 or 3.1, use the MQTT clean session option to access the subscription buffer. When clean session is set to false, the subscriber receives messages from the buffer. When clean session is set to true, the buffer is reset.

Note: The subscription buffer limit applies regardless of the quality of service setting that is used. It is possible that a message that is sent at level 1 or 2 might not be delivered to an application that is unable to keep up with the messages rate for its subscription.

I think the next thing I’ll investigate is the “cleansession” flag in my bridge configuration.

MQTT (Message Queuing Telemetry Transport) can queue/buffer data if the whole route from publisher to subscriber are QoS2 and configured correctly, as discussed in the emoncms discussions wayback, there needs to be a unique id defined to avoid the cleansession default setting.

The only issue with that is when users have a MQTT broker on another device it may not be able to queue the data if there’s a network issue. The idea of emonhub was to have timestamping, decoding, naming. scaling etc and buffering as close to source as possible. Local buffering gives opportunity for other features such as throttling data (as done in the http interfacer) so multiple/many frames of data could be passed in a single MQTT message. Or for example, I’ve (and other users) have used a CSV interfacer to save a copy of the data to disk locally and buffering to minimise the disk writes is of value to maximise an sdcard’s lifespan.

The buffering is in-built to the core of emonhub and was active on the MQTT interfacer at one time, but when the value of that was debated due to the lack of timestamps on the buffered data, the buffering was removed from that interfacer rather than progressing the mqtt interfacer to include timestamps at that time. So, if timestamps are now optional, It would be good to see the buffering reinstated when that option is used.

This is quite true of the way mqtt has been implemented by OEM and was discussed in detail when Trystan and I reviewed emonhub and it’s future development. Either MQTT messages can be short lived “status only” type messages that are simply overwritten and not buffered or they can be “timestamped data” that every effort should be made to collect and retain each message to plot a full picture, to do that MQTT must be QoS2 end to end and ideally buffered as close to source as possible. Using the broker could be ok for “all on one device” setups but since emonhub has the capability, that should be the way we do it to provide a more flexible and consistent application. (IMO :grin:).

I would love to see the buffering reinstated. Let me know if I can help test, develop, whatever.

In the meantime, I have been testing using Node-RED on the emonhub server as a bridge to an emonCMS server. I connect an MQTT inbound from the local emonhub to an MQTT outbound to emonCMS, then put a delay between them. I tested how long I can buffer timestamped MQTT frames in the delay node, then flush them out to emonCMS and still have emonCMS record all of the frames. The answer is about 80-90 seconds in my case. I have about 20 different nodes inbound to emonhub.

Here’s the flow.


[{"id":"5db8772e.416b","type":"mqtt in","z":"dc2f1355.d367d","name":"emon/# on emonHub server","topic":"emon/#","qos":"2","datatype":"auto","broker":"bb796140.4b562","x":160,"y":500,"wires":[["ce4109c9.535b58"]]},{"id":"7493a1be.66c78","type":"mqtt out","z":"dc2f1355.d367d","name":"","topic":"MQTT Outbound to emonCMS Server","qos":"2","retain":"true","broker":"15c72fc4.fb1848","x":930,"y":500,"wires":[]},{"id":"2095e7ef.5ea4e","type":"delay","z":"dc2f1355.d367d","name":"delay","pauseType":"delayv","timeout":"20","timeoutUnits":"minutes","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":670,"y":500,"wires":[["7493a1be.66c78"]]},{"id":"b765bf1f.7b2f18","type":"inject","z":"dc2f1355.d367d","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":120,"y":400,"wires":[["32d84fdb.4cbda"]]},{"id":"95f3fe32.987cf8","type":"inject","z":"dc2f1355.d367d","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":120,"y":440,"wires":[["905084c2.841ea"]]},{"id":"32d84fdb.4cbda","type":"change","z":"dc2f1355.d367d","name":"Pause 30 Minutes","rules":[{"t":"set","p":"delay","pt":"flow","to":"108000000","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":310,"y":400,"wires":[[]]},{"id":"905084c2.841ea","type":"change","z":"dc2f1355.d367d","name":"Unpause","rules":[{"t":"set","p":"delay","pt":"flow","to":"0","tot":"str"},{"t":"set","p":"flush","pt":"msg","to":"1","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":280,"y":440,"wires":[["2095e7ef.5ea4e"]]},{"id":"ce4109c9.535b58","type":"change","z":"dc2f1355.d367d","name":"Set Delay","rules":[{"t":"set","p":"delay","pt":"msg","to":"delay","tot":"flow"}],"action":"","property":"","from":"","to":"","reg":false,"x":500,"y":500,"wires":[["2095e7ef.5ea4e"]]},{"id":"bb796140.4b562","type":"mqtt-broker","name":"","broker":"localhost","port":"1883","clientid":"","usetls":false,"compatmode":true,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthRetain":"false","birthPayload":"","closeTopic":"","closeRetain":"false","closePayload":"","willTopic":"","willQos":"0","willRetain":"false","willPayload":""},{"id":"15c72fc4.fb1848","type":"mqtt-broker","name":".50","broker":"192.168.1.50","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthRetain":"false","birthPayload":"","closeTopic":"","closeQos":"0","closeRetain":"false","closePayload":"","willTopic":"","willQos":"0","willRetain":"false","willPayload":""}]

As I said, “MQTT is not generally used to buffer data” - I didn’t say it couldn’t. I’m fully aware it could but it isn’t configured that way within the EmonSD and EmonScripts installation process.

From an emonhub perspective, now I have added the ability to timestamp the MQTT JSON data, the emonhub could buffer the data and then send it on to the MQTT broker (when it became available) such that the broker then buffers those messages and allows the client to ‘catch up’. (i.e. emonhub buffers then sends data to the broker at a rate the broker can accept it before the broker publishes it as a rate the subscriber can receive it).

I fear that the current implementation of the subscribe process in EmonCMS would not cope even if QOS2 was demanded.

I have previously overloaded the MQTT input process, sending too many topics to the base topic.

Yep, there lies the rub. what you describe tUsing there is why emonhub worked the way it did. The bulk input (http) api was to reduce the posting rate to emoncms(.org). HTTP and MQTT are just different transport methods, these is no real reason the input api’s need to differ so much. Using a bulk api for MQTT would allow the buffering and catch up to happen faster and smoother whilst consuming less resource. It should have been there from day one, then the MQTT interfacer and emoncms input would have travelled a very different (faster) development path.