MQTT Publishing on the ESP8266

I’ve done some tests with esp8266 and NodeMcu Lua based firmware, and I’ve come to the following conclusions:

  • We can only publish again in a MQTT topic if, and only if the previous topic call has finish. That’s why we need to use a semaphore to stop multiple publish request at the same time.
  • MQTT based code consumes a lot of heap. So much, that to make it work I have to strip empty lines and comments from the code. It seems that the latest NodeMcu firmware does have a node.compile() function that we can use.
  • Using unbound arrays for storing MQTT messages while waiting for their turn to be publish, will end up using all available heap space, and the esp8266 will reboot, because setting previous array entries to nil won’t free the used space at all.

So I’ve now a sample code that uses a circular buffer to publish messages. The code below has some flaws, namely:

  • Doesn’t check if the buffer is full, it will just overwrite any older unpublished messages.
  • Entries on the buffer have no QOS defined.
  • Has no “intelligence” for example to only override messages with QOS=0 that might get lost anyway, and keep QOS>0 messages. That would mean that eventually the buffer might get full, and all publishing from the user code can fail.

Here is the code:

mqtt_cbuf.lua

-- Configuration to connect to the MQTT broker.
BROKER = "192.168.1.16"   -- Ip/hostname of MQTT broker
BRPORT = 1883             -- MQTT broker port
BRUSER = "user"           -- If MQTT authenitcation is used then define the user
BRPWD  = "pwd"            -- The above user password
CLIENTID = "ESP8266-" ..  node.chipid() -- The MQTT ID. Change to something you like
BFSIZE = 16

-- MQTT topics to subscribe
topics = {"topic1","topic2","topic3","topic4"} 

-- Control variables.
pub_sem = 0         
current_topic  = 1  
topicsub_delay = 50

-- Publishing structures
pub_topic = {}
pub_message = {}
pub_head = 1
pub_tail = 1

print("heap: " .. node.heap() )
m = mqtt.Client( CLIENTID, 120, BRUSER, BRPWD)

print "Connecting to MQTT broker. Please wait..."
print("heap: " .. node.heap() )
m:connect( BROKER , BRPORT, 0, function(conn)
     print("Connected to MQTT:" .. BROKER .. ":" .. BRPORT .." as " .. CLIENTID )
     mqtt_sub() 
end)

function mqtt_sub()
     if table.getn(topics) < current_topic then
          run_main_prog()
     else
          m:subscribe(topics[current_topic] , 0, function(conn)          
          end)
          current_topic = current_topic + 1  
          tmr.alarm(5, topicsub_delay, 0, mqtt_sub )
     end
end

function _publish_data()
   if pub_head ~= pub_tail then
     if pub_sem == 0 then
       pub_sem = 1
       m:publish( pub_topic[pub_head], pub_message[pub_head] ,0 ,0 , function(conn) 
          print("Sending to " .. pub_topic[pub_head] ..": " .. pub_message[pub_head])
          pub_sem = 0
          pub_topic[pub_head] = nil
          pub_message[pub_head] = nil
          pub_head = (pub_head + 1) % BFSIZE
       end)
     end  
   end
end

function publish_data(topic , message )
  pub_topic[pub_tail] = topic
  pub_message[pub_tail] = message
  pub_tail = (pub_tail + 1) % BFSIZE
  _publish_data()
end

function publish_data1()
 publish_data("Out1","Mensagem para out1" )
 publish_data("Out2","Mensagem para out2" )
end

function run_main_prog()
     tmr.alarm(2, 5000, 1, publish_data1 )
     m:on("message", function(conn, topic, data)
        print(topic .. ":" )
        if (data ~= nil ) then
          print ( data )
        end
      end )
end

Apparently this code can run for a long time.

Also at the beginning I print the heap space before running and connecting to a MQTT broker.

  • So after a reset: node.heap = 19600 ( with init.lua file)
  • Running the above file ( dofile(“mqtt_cbuf.lua”)): node.heap = 10560 after compiling and before connecting to the MQTT broker
  • After connecting: node.heap = 8136

If we use the new node.compile function with the function node.compile(“mqtt_cbuf.lua”) we should have now a new file named mqtt_cbuf.lc. So:

 

  • So after a reset: node.heap = 19472 ( with init.lua file)
  • Running the above file ( dofile(“mqtt_cbuf.lc”)): node.heap = 12160 after compiling and before connecting to the MQTT broker
  • After connecting: node.heap = 9720

A bit better. Not much, but makes a difference.

Since a small code as the above can fail to run due to lack of heap space, solving some of the issues that I’ve referred on the beginning of this post might only be feasible if really needed. Let’s see if I can improve this.

ESP8266: Logging data in a MySQL database

The following post shows how it is possible to store data that comes from the esp8266 into a MySQL database.
I assume that for this to be done, we have the following architecture:
– A LAMP server (Linux, Apache, MySQL and PHP) or a WAMP server (on Windows).
– ESP8266 running the NodeMCU firmware with LUA language that periodically calls the LAMP server and it passes the data on the HTTP request query string.

There are other ways of passing data, for example, using MQTT and with Node-Red storing data into MySQL, for example, but that is another story.

Creating the database:
Let’s create the database. Access the LAMP server, and has an user with enough privileges, execute the mysql client command line utility. Normally the user is root.

Create an user for accessing the database server:
mysql> create user ‘esp8266’@’localhost’ identified by ‘secretpwd';
Query OK, 0 rows affected (0.06 sec)

Create the database:
mysql> create database esp8266;
Query OK, 1 row affected (1.40 sec)

Grant the necessary privileges to access the database engine:
mysql> grant usage on *.* to ‘esp8266’@’localhost’ with max_queries_per_hour 10000;
Query OK, 0 rows affected (0.07 sec)

On the above statement, the ‘with max_queries_per_hour’ is optional, and it can be left out if there is no reason to limit resource access:
mysql> grant usage on *.* to ‘esp8266’@’localhost';
Query OK, 0 rows affected (0.02 sec)

And let’s make sure that the esp8266 mysql user can do whatever it needs on the esp8266 database:
mysql> grant all on esp8266.* to ‘esp8266’@’localhost';
Query OK, 0 rows affected (0.02 sec)

So now we can quit, and access the newly created database to create, in the case of this post, a single table for storing data. Let’s access the new database:

mysql -u esp8266 -psecretpwd esp8266

And create the table:

mysql> create table DataTable ( logdate DATETIME , field VARCHAR(64), value BIGINT);
Query OK, 0 rows affected (0.70 sec)

I’ve added a column named field so, that, if we want in the future, to query data based on field data.

Storing data:

Storing data is done through the following php page called store.php:

This code only stores if the query parameter on the request has a parameter named heap.

<?php
$servername = “localhost”;
$username = “esp8266″;
$password = “secretpwd”;
$dbname = “esp8266″;

$now = new DateTime();
parse_str( html_entity_decode( $_SERVER[‘QUERY_STRING’]) , $out);

if ( array_key_exists( ‘heap’ , $out ) ) {
// Create connection
$conn = new mysqli($servername, $username, $password, $dbname);
// Check connection
if ($conn->connect_error) {
die(“Connection failed: ” . $conn->connect_error);
}

$datenow = $now->format(‘Y-m-d H:i:s’);
$hvalue  = $out[‘heap’];

$sql = “INSERT INTO DataTable ( logdate , field  , value) VALUES ( ‘$datenow’ , ‘heap’, $hvalue )”;

if ($conn->query($sql) === TRUE) {
echo “New record created successfully”;
} else {
echo “Error: ” . $sql . “<br>” . $conn->error;
}

$conn->close();
}
?>

The esp8266 code:

The esp8266 code is based on code of some of my previous posts, but basically if we have WIFI connection the following code segment (stored on a file called ping.lua) is called periodically:


conn=net.createConnection(net.TCP, false)
conn:on("receive", function(conn, payload) print("Get done.", payload) end )
conn:connect(80,"xx.xx.xx.xx")
conn:send("GET /store.php?heap=" .. node.heap() .." HTTP/1.1\r\nHost: xx.xx.xx.xx\r\n" .. "Connection: keep-alive\r\nAccept: */*\r\n\r\n")

Viewing data:

We can view data stored on the MySQL database with the following code (list.php):

<?php
$servername = “localhost”;
$username = “esp8266″;
$password = “secretpwd”;
$dbname = “esp8266″;

parse_str( html_entity_decode( $_SERVER[‘QUERY_STRING’]) , $out);

$sqlq = “Select * from DataTable “;

if ( array_key_exists( ‘field’ , $out ) ) {
$sqlq = $sqlq . ” where field = ‘” . $out[‘field’] . “‘” ;
}

echo $sqlq;
// Create connection
$conn = new mysqli($servername, $username, $password, $dbname);
// Check connection
if ($conn->connect_error) {
die(“Connection failed: ” . $conn->connect_error);
}

$result = mysqli_query( $conn , $sqlq );

if ( $result->num_rows  > 0 ) {

echo “<table border=’1′>
<tr>
<th>Log Date</th>
<th>Field</th>
<th>Value</th>
</tr>”;

while($row = mysqli_fetch_array($result))
{
echo “<tr>”;
echo “<td>” . $row[‘logdate’] . “</td>”;
echo “<td>” . $row[‘field’] . “</td>”;
echo “<td>” . $row[‘value’] . “</td>”;
echo “</tr>”;
}
echo “</table>”;
}
$conn->close();

?>

If we call the list.php file without parameters, it will show all the available data. Otherwise it only shows what is specified on the field query parameter, for example: http://address/list.php?field=sensor1

Logstash: How to trigger an alarm or event

The ELK stack (ElasticSearch, LogStash and Kibana) is a great tool to centralize log monitoring. The following configuration is what I use for triggering an alarm or event when something that needs immediate attention happens on the log file.

The tip is quite easy to implement and is done by changing the configuration file for the logstash instance that gets data from the Redis database into the ElasticSearch.

For example, the base configuration file is something like this:

input {
redis {
host => “127.0.0.1”
# these settings should match the output of the agent
data_type => “list”
key => “logstash”
# We use the ‘json’ codec here because we expect to read
# json events from redis.
codec => json
}
}

output {
#stdout { debug => true debug_format => “json”}
elasticsearch {
host => “127.0.0.1”
}
}
This comes out straight from the Logstash documentation.

What if I want to do something when some event/string shows up on the log files?

All is needed is to change the output section to the following:

output {
#stdout { debug => true debug_format => “json”}

elasticsearch {
host => “127.0.0.1”
}

if “com.megacorp.security.exception.Exception” in [message]
 {
   exec {
     command => “/home/megacorp/monit/bin/notify.sh Error_message &”
   }
 }
}

So all is needed is to add the if clause, and the condition behaves as grep into the JSON message field.
So we can add this section several times to call any external command with exec, or send mail, or anything that logstash supports.  Just make sure that if using the exec plugin, the command has the & operator at the end to run on the background and not lock the logstash agent.
Also the [message] JSON component can be changed to anything that the JSON provides, like [tags] or [type].

 

ESP8266, NodeMcu and MQTT: Event publishing queueing

The MQTT code that I’ve copied and changed that is on the following post: https://primalcortex.wordpress.com/2015/02/06/nodemcu-and-mqtt-how-to-start/ works fine for subscribing topics as long that you don’t overload it: https://primalcortex.wordpress.com/2015/02/13/esp8266-nodemcu-and-mqtt-event-subscription-load-testing/

The code subscribes an topic array for receiving and this works, more or less fine. But it also creates a timer to call periodically a publish_data functions, for testing purposes. In reality imagine that we want to publish something when an Input pin changes value. We can do that with the trig function. For example:

gpio.trig(5, "down",publish_data1)
gpio.trig(4, "down",publish_data2)

Where publish_data# are functions that publish something to a MQTT topic. The problem is that it might happen that sometimes both triggers (two buttons pressed at the same time, for example) are activated and try to publish to a MQTT topic at the same time.
Let’s simulate this ( please note that this code is a section of the code in the MQTT Getting Started post.

-- Sample publish functions:
function publish_data(topic, message)
if pub_sem == 0 then
pub_sem = 1
m:publish( topic, message ,0 ,0 , function(conn)
print("Sending to " .. topic ..": " .. message)
pub_sem = 0
id1 = id1 +1
end)
end
end

function publish_data1()
publish_data("Out1","Mensagem para out1" )
publish_data("Out2","Mensagem para out2" )
end

In the above code sample, the Out2 topic message is never published because it takes time to the message for the previous topic to go out (Out1), and so the message gets lost. And this happens because we can’t call the m:publish for MQTT publishing before it has ended the previous call, hence the semaphore to avoid this.

So we are going to do some queueing for publishing:

I’m using an unbound Lua array, with two pointers, one for the Head, and another for the Tail.
Please note that this code is not perfect. The last message is only sent if adding a new message.
Also note that this simple code takes huge amounts of memory, even on “compile/interpretation” phase.
But it works for a while before crashing down with lack of memory.

-- Configuration to connect to the MQTT broker.
BROKER = "192.168.1.16" -- Ip/hostname of MQTT broker
BRPORT = 1883 -- MQTT broker port
BRUSER = "user" -- If MQTT authenitcation is used then define the user
BRPWD = "pwd" -- The above user password
CLIENTID = "ESP8266-" .. node.chipid() -- The MQTT ID. Change to something you like

-- MQTT topics to subscribe
topics = {"topic1","topic2","topic3","topic4"} -- Add/remove topics to the array

-- Control variables.
pub_sem = 0 -- MQTT Publish semaphore. Stops the publishing whne the previous hasn't ended
current_topic = 1 -- variable for one currently being subscribed to
topicsub_delay = 50 -- microseconds between subscription attempts, worked for me (local network) down to 5...YMMV

-- Publishing structures
pub_topic = {}
pub_message = {}
pub_head = 1
pub_tail = 1

-- connect to the broker
print("heap: " .. node.heap() )
m = mqtt.Client( CLIENTID, 120, BRUSER, BRPWD)

print "Connecting to MQTT broker. Please wait..."
print("heap: " .. node.heap() )
m:connect( BROKER , BRPORT, 0, function(conn)
print("Connected to MQTT:" .. BROKER .. ":" .. BRPORT .." as " .. CLIENTID )
mqtt_sub() --run the subscription function
end)

function mqtt_sub()
if table.getn(topics) < current_topic then
run_main_prog()
else
m:subscribe(topics[current_topic] , 0, function(conn)
end)
current_topic = current_topic + 1
tmr.alarm(5, topicsub_delay, 0, mqtt_sub )
end
end

function _publish_data()
if pub_head ~= pub_tail then
if pub_sem == 0 then
pub_sem = 1
m:publish( pub_topic[pub_head], pub_message[pub_head] ,0 ,0 , function(conn)
print("Sending to " .. pub_topic[pub_head] ..": " .. pub_message[pub_head])
pub_sem = 0
pub_topic[pub_head] = nil
pub_message[pub_head] = nil
pub_head = pub_head + 1
end)
end
end
end

function publish_data(topic , message )
pub_topic[pub_tail] = topic
pub_message[pub_tail] = message
pub_tail = pub_tail + 1
_publish_data()
end

function publish_data1()
publish_data("Out1","Mensagem para out1" )
publish_data("Out2","Mensagem para out2" )
end

function run_main_prog()
print("Main program")

tmr.alarm(2, 5000, 1, publish_data1 )

-- Callback to receive the subscribed topic messages.
m:on("message", function(conn, topic, data)
print(topic .. ":" )
if (data ~= nil ) then
print ( data )
end
end )
end

ESP8266, NodeMcu and MQTT: Event subscription load testing

I’ve being doing some testing with the latest NodeMcu regarding MQTT support. My experience is, more or less, documented here: https://primalcortex.wordpress.com/2015/02/06/nodemcu-and-mqtt-how-to-start/ .

I wanted to check if concurrent subscription seems to work fine, since I know that publishing is not straightforward. In the code from the above link, the esp8266 subscribes to four topics, named topic1, topic2 and so on.

This simple bash script, calls repeatedly the mosquitto broker publishing command:

#!/bin/bash
for i in {1..60}
do
/home/pcortex/01.Develop/mosquitto/mosquitto-1.3.5/client/mosquitto_pub -h 192.168.1.16 -m "Mensagem $i" -t topic1
/home/pcortex/01.Develop/mosquitto/mosquitto-1.3.5/client/mosquitto_pub -h 192.168.1.16 -m "Mensagem $i" -t topic2
done

But with low values on the loop count, the MQTT code on the esp8266 seems to work fine, but with higher values, as above (60), the esp8266 hangs and reboots. One interesting thing was that after some testing not even doing a reset or completely powering it off, I was able to bring the esp8266 back online. It just put out garbage on the serial output and that was it. Strange indeed.

So I’ve reflash it again with the latest NodeMcu version (lots of reflashing hey :) ), but still the esp8266 did not recover! Only garbage out when connecting to the serial port (And no, it wasn’t the serial port rate issue). So I’ve reflashed again the original AT Firmware, and it came back to life again, and then reflash it again with the latest Nodemcu version.

Adding a delay on the bash script, before the done line, like sleep 1 seems to keep the esp8266 running without issue.

So indeed this shows, that bombarding the esp8266 with MQTT messages in burst mode is not a good idea…

Nodemcu and MQTT: How to start

The latest nodemcu version has MQTT protocol support already built in. MQTT is one of the supporting protocols for building IoT, Internet of Things devices,

So I’ve done some tests using the new MQTT functions, with code based from this thread on ESP8266 forum http://www.esp8266.com/viewtopic.php?f=19&t=1278&start=40 ( My code is based on the helix user code. Thanks!). I’ve made some changes to make things work in a more robust way, that I’m documenting the on this post.

My initial tests where done with nodemcu version 20150123 but I’ll write this post based on version NodeMCU 0.9.5 build 20150127  so if your are using an older or newer version of the firmware, the code on this post can behave a bit different of what is expected. Also I’m using a MQTT broker named RSMB ( Real small message broker) running in my Synology NAS, but we can run any MQTT broker. Also for testing I’m using the great MQTT-SPY program.

The init.lua code that I use is as documented in this post: https://primalcortex.wordpress.com/2014/12/30/esp8266-nodemcu-and-lua-language-and-some-arduino-issues/ . That code makes sure that an WiFi connection is made before continuing.  The only difference is that on the init.lua code, namely the launch function, if a successful connection is made, instead of calling a ping.lua file repeatedly, a mqtt.lua file is called once. To do this just change the CMDFILE to be CMDFILE = “mqtt.lua”. Note that the timer is one shot now. It calls the mqtt.lua file and that’s it. Then the mqtt.lua file takes control.

CMDFILE = "mqtt.lua"   -- File that is executed after connection
-- Change the code of this function that it calls your code.
function launch()
  print("Connected to WIFI!")
  print("IP Address: " .. wifi.sta.getip())
  tmr.alarm(0, 1000, 0, function() dofile(CMDFILE) end )  -- Zero as third parameter. Call once the file.
end  -- !!! Increase the delay to like 10s if developing mqtt.lua file otherwise firmware reboot loops can happen

The MQTT API is used by basically creating an MQTT Client with mqtt.Client(), then connecting, and finally we can subscribe or post to topics. Subscriptions will call a callback function when data is received, and Publishing will also call a callback function when data is sent.

It also seems that only at one message at the same time can be publish, otherwise we get an error like this unprotected error in call to Lua API (pir.lua:34: sending in process) and the module reboots. I’ve solved the issue on a first approach by using a semaphore that blocks a new call before the older one is finished.

The following code uses two different functions to publish to a topic, do to the use of the publish semaphore. Please read  the code comments to follow the way the code works:

-- Configuration to connect to the MQTT broker.
BROKER = "192.168.1.16"   -- Ip/hostname of MQTT broker
BRPORT = 1883             -- MQTT broker port
BRUSER = "user"           -- If MQTT authenitcation is used then define the user
BRPWD  = "pwd"            -- The above user password
CLIENTID = "ESP8266-" ..  node.chipid() -- The MQTT ID. Change to something you like

-- MQTT topics to subscribe
topics = {"topic1","topic2","topic3","topic4"} -- Add/remove topics to the array

-- Control variables.
pub_sem = 0         -- MQTT Publish semaphore. Stops the publishing whne the previous hasn't ended
current_topic  = 1  -- variable for one currently being subscribed to
topicsub_delay = 50 -- microseconds between subscription attempts, worked for me (local network) down to 5...YMMV
id1 = 0
id2 = 0

-- connect to the broker
print "Connecting to MQTT broker. Please wait..."
m = mqtt.Client( CLIENTID, 120, BRUSER, BRPWD)
m:connect( BROKER , BRPORT, 0, function(conn)
     print("Connected to MQTT:" .. BROKER .. ":" .. BRPORT .." as " .. CLIENTID )
     mqtt_sub() --run the subscription function
end)

function mqtt_sub()
     if table.getn(topics) < current_topic then
          -- if we have subscribed to all topics in the array, run the main prog
          run_main_prog()
     else
          --subscribe to the topic
          m:subscribe(topics[current_topic] , 0, function(conn)
               print("Subscribing topic: " .. topics[current_topic - 1] )
          end)
          current_topic = current_topic + 1  -- Goto next topic
          --set the timer to rerun the loop as long there is topics to subscribe
          tmr.alarm(5, topicsub_delay, 0, mqtt_sub )
     end
end

-- Sample publish functions:
function publish_data1()
   if pub_sem == 0 then  -- Is the semaphore set=
     pub_sem = 1  -- Nop. Let's block it
     m:publish("temperature","hello",0,0, function(conn) 
        -- Callback function. We've sent the data
        print("Sending data1: " .. id1)
        pub_sem = 0  -- Unblock the semaphore
        id1 = id1 +1 -- Let's increase our counter
     end)
   end  
end

function publish_data2()
   if pub_sem == 0 then
     pub_sem = 1
     m:publish("ts","hello",0,0, function(conn) 
        print("Sending data2: " .. id2)
        pub_sem = 0
        id2 = id2 + 1
     end)
   end  
end

--main program to run after the subscriptions are done
function run_main_prog()
     print("Main program")
     
     tmr.alarm(2, 5000, 1, publish_data1 )
     tmr.alarm(3, 6000, 1, publish_data2 )
     -- Callback to receive the subscribed topic messages. 
     m:on("message", function(conn, topic, data)
        print(topic .. ":" )
        if (data ~= nil ) then
          print ( data )
        end
      end )
end

This seems to work fine for some time, but then, even with some free heap space, it stops working.  No idea why. Even the subscriptions fail to work, and at the broker I get a ending connection event due to time-out.

20150206 213132.389 CWNAN0033I Connection attempt to listener 1883 received from client ESP8266-10350678 on address 192.168.1.82:32456 
20150206 214010.033 CWNAN0024I 120 second keepalive timeout for client ESP8266-10350678, ending connection

When at this stage that seems locked the semaphore is “red”: pub_sem = 1…

Another interesting notes: if at the main function the timer 2 and 3 have the same interval,  publish_data2 is never called. For example:

     tmr.alarm(2, 5000, 1, publish_data1 )
     tmr.alarm(3, 5000, 1, publish_data2 )

Interesting… probably both timers are triggered at the same time and only the first one is honoured.

Second the way this is done, it means that for every topic a publish_data function must be built, and this will increase the semaphore demand.

I’ll keep this post short and end it here, but I’ll write a follow up post where the publish_data function is unified for any topic, and, I hope, we will be using queues for queueing the messages to be published.

NodeMcu and Heap utilization: Great improvements

Since I saw that the MQTT client was ported into the NodeMCU firmware that allows the ESP8266 to be programmed in the Lua language, and knowing the Heap issues that this firmware has had in the past, I gave it a again a try to see how things have improved.

As we can see in the bellow graph, things, are not totally solved but it’s waaaay better…

Selection_052

This means that, at least for my sample code, things are quite stable now. Not a single reboot since the 24th of January. Not bad. A new stability milestone has been achieved making the NodeMcu firmware a firm contender to building applications on the esp8266 without the steep native SDK learning curve.