Setting up an IoT framework/dashboard with NodeRed, Mosca/Mosquitto and Freeboard.io dashboard

With the low cost of the esp8266 chip and boards, it is now easy and affordable to add connectivity  to almost anything that we can think of. Still if we build an Internet of Things device by enabling it with the esp8266 chip to connect to the internet, or other networks, what we can do with the data that we gather? Basically we do what we do with any data that we get, store it, process it and show it, and that’s what we are going to see on this post.

The software (running on Linux platform) that I’ll use is the following:

IBM NodeRed: (http://nodered.org/) This great software will allow us to get data, process it, store and publish it, with a workflow based designer. It runs under Node-Js Javascript based server engine.

MQTT Broker Mosca: (http://www.mosca.io/) There are several MQTT brokers available, but this one runs on Node-Js and so is platform agnostic, and runs where Node-Js runs. Also it has websocket support out of the box, which allows to use the Freboard.io dashboard in a straight foward way.

Freeboard.io Dashboard: (http://freeboard.io/) Is a great dashboard and freely available at the project GitHub page (https://github.com/Freeboard/freeboard). It allows to design and store several dashboards with some cool widgets.

Step 1: Setting up NodeRed:

First of all make sure that you have node and npm (the node package manager) installed on your distribution. You can see that with the commands node -v and npm -v for checking out their versions. In my case, and for now, it’s version v0.10.33 for nodejs and 1.4.28 for npm.

First I’ve created a directory named IoTServer where I’ll keep all software and configurations.

Download the zip file from nodered GitHub page https://github.com/node-red/node-red or just clone the repository. I clone the repository, because it’s simpler in the future to update it with just git pull. First just make sure that you have the icu/libicu libraries installed.

cd ~
mkdir IoTServer
cd IoTServer
git clone https://github.com/node-red/node-red
cd node-red
npm install

We can now run the nodered server with the command node red.js -v. I use -v at the end to see if any error crops up and also to see if any node modules are missing, and need to be installed. For example:

pcortex@cloudsrv:~/IoTServer/node-red$ node red.js -v  
   Welcome to Node-RED 
   =================== 
 
24 Feb 11:35:16 - [info] Node-RED version: v0.10.3.git 
24 Feb 11:35:16 - [info] Node.js  version: v0.10.33 
24 Feb 11:35:16 - [info] Loading palette nodes 
24 Feb 11:35:59 - [warn] ------------------------------------------ 
24 Feb 11:35:59 - [warn] [arduino] Error: Cannot find module 'arduino-firmata' 
24 Feb 11:35:59 - [warn] [rpi-gpio] Info : Ignoring Raspberry Pi specific node. 
24 Feb 11:35:59 - [warn] [redisout] Error: Cannot find module 'redis' 
24 Feb 11:35:59 - [warn] [mongodb] Error: Cannot find module 'mongodb' 
24 Feb 11:35:59 - [warn] ------------------------------------------ 
24 Feb 11:35:59 - [info] Server now running at http://127.0.0.1:1880/ 
24 Feb 11:35:59 - [info] Flows file not found : flows_clouds.json 
24 Feb 11:35:59 - [info] Starting flows

As we can see the node modules arduino, rpi-gpio, redisout and mongodb are missing. If we are going to use them, we need to install them with the npm tool. Also the following message: Flows file not found : flows_clouds.json informs us that no saved nodered workflows where found. We can now access the nodered at adress http://localhost:1880/. If we want to change the bind address or add authentication, we should change the settings.js file. Take a look at http://nodered.org/docs/configuration.html for more information, and start nodered with: node red.js -v -s settings.js

We can now start designing our nodered workflows that receive data from the esp8266 via MQTT protocol or even by simple REST based HTTP protocol, process it and store the data payload, if any.

For storing data into a database, we can use MongoDB, or mysql, for example, installing the needed modules.

For mysql we can do:

cd ~/IoTServer/node-red
npm install bignumber.js require-all readable-stream
npm install mysql
npm install node-red-node-mysql

And we need to restart nodered. The mysql node should be available now in storage pallet.

Step 2: Installing Mosca MQTT broker

We can use several brokers, but since we already are running nodered on node-js, we will use Mosca. Both nodered and Mosca can provide a websocked based interface. This is of interest because we can use browser based websocket applications, like freeboard.io, to consume data from our IoT devices.

cd ~
sudo -s
sudo npm install mosca bunyan -g

And to run it just do: mosca -v | bunyan

For running it with websockets enabled, on TCP port 3000, just do:

mosca -v --http-port 3000 --http-bundle --http-static ./ | bunyan

Check out the documentation for more information at: https://github.com/mcollina/mosca/wiki/MQTT-over-Websockets

Step 2: Mosquitto MQTT broker with Websockets support

If having trouble installing or using Mosca, we can use the latest Mosquitto version 1.4 that has websockets support.

For that we need to make sure that cmake and uuid/uuid-dev are installed (For example: sudo apt-get install cmake uuid uuid-dev).

Also we need to download the libwesockets library, compile it and install it:

cd ~/IoTServer
git clone git://git.libwebsockets.org/libwebsockets
cd libwesockets
mkdir build
cd build
cmake ..
make
sudo make install
We then need to download and compile Mosquitto 1.4:
cd ~/IoTServer
wget http://mosquitto.org/files/source/mosquitto-1.4.tar.gz
tar xvzf mosquitto-1.4.tar.gz
cd mosquitto-1.4
We need to edit the config.mk file and change the option WITH_WEBSOCKETS:=no  to WITH_WEBSOCKETS:=yes
And finally:
make
sudo make install
To activate websocket support we need to edit the mosquitto.conf file located at /etc/mosquitto or some other directory and add the following lines, for example at the end of the file:
listener 1883

listener 9001 127.0.0.1
protocol websockets

Then we run it with:

pcortex@cloudsrv:~/IoTServer$ mosquitto -c /etc/mosquitto/mosquitto.conf
1424790588: mosquitto version 1.4 (build date 2015-02-24 14:38:47+0000) starting 
1424790588: Config loaded from /etc/mosquitto/mosquitto.conf. 
1424790588: Opening ipv4 listen socket on port 1883. 
1424790588: Opening ipv6 listen socket on port 1883. 
1424790588: Opening websockets listen socket on port 9001.

Step 3: Installing freeboard.io dashboard

We are almost at the end of this long post. For our infrastructure we need now to install the dashboard that will allow us to see the data in the browser.
cd ~/IoTServer
git clone https://github.com/Freeboard/freeboard.git

For now I’ll just use apache to serve the freeboard dashboards that is installed in my server. Because freeboard is not installed on the root web directory we need to make the freeboard available to Apache. The easiest way to do this, as long as Apache has the FollowSymLinks option enabled for the document root, is to create a link to the freeboard directory on the web document root:

sudo -s
cd /var/www
ln -s /home/pcortex/IoTServer/freeboard iot

And now freeboard is available at the url http://myserveraddres/iot.

We need now, and finally to add MQTT over Websockets support to freeboard… We are almost there. This post http://jpmens.net/2014/11/12/freeboard-a-versatile-dashboard/   shows how it’s done but I’ll repeat it here again:

1st: Download the development version of mqtt.js here: (http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.javascript.git/plain/src/mqttws31.js?h=develop) and save it:

cd ~/IoTServer/freeboard/plugins
mkdir mqtt
cd mqtt
wget --output-document mqttws31.js http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.javascript.git/plain/src/mqttws31.js?h=develop

We will now download the freeboard MQTT plugin:

cd ~/IotServer
git clone https://github.com/alsm/freeboard-mqtt
cd freeboard/plugins/mqtt
cp ~/IoTServer/freeboard-mqtt/paho.mqtt.plugin.js .

We need to edit the paho.mqtt.plugin.js file and change the line:

                "external_scripts" : [
                        "<full address of the paho mqtt javascript client>" 
                ],

to

               "external_scripts" : [
                        "plugins/mqtt/mqttws31.js" 
                ],

and finally we need to change the index.html file from:

   <scripttype="text/javascript">
        head.js("js/freeboard+plugins.min.js", 
                // *** Load more plugins here *** 
                function(){

to

   <scripttype="text/javascript">
        head.js("js/freeboard+plugins.min.js", 
                "plugins/mqtt/paho.mqtt.plugin.js", 
                // *** Load more plugins here *** 
                function(){

That’s it. We are finally ready. All infrastructure is done. We need now just to configure it.

Final step: Testing it out with mosquitto websockets!:

Let’s start Mosquitto: nohup mosquitto -c /etc/mosquitto/mosquitto.conf &

And check if the WS protocol is active:

netstat -nap | grep 9001

tcp        0      0 0.0.0.0:9001            0.0.0.0:*               LISTEN      18973/mosquitto

Setting up Freeboard.io:  It’s very important to notice that the Dashboard will run on our browser, making in fact our browser the client. This means that when we configure freeboard, we must take notice of this when filling up the MQTT broker address. It will be localhost if the browser AND the broker runs on the same machine, but for the dashboard to work anywhere we should use the public ip of the machine running the broker:

Access the freeboard.io dashboard, and select Datasources -> Add. From the dropdown box, we select the Paho MQTT provider.

Selection_055

We need to configure the data source as follows:

Selection_056

Change the MQTT server address to something that makes sense in your config.

For Mosquitto with websockets enabled, the port is 9001, and for Mosca, the port is 3000, but of course this can change, depending of the configuration used.

We can now add a Panel and a gauge to see data from the topic /data that we defined on the above datasource.

So, Add Pane, and on the new Pane select the + (plus) to add a widget. In our case we will select Gauge. We fill out the required info and on the Value field we press Datasource, and select the newly previous created datasource named Data and because we are not using JSON for data, we just append .msg at the end. So something like datasource[“data”].msg should be writen.

Selection_057

We press Save, and that’s it.

We can now publish into the topic, and the dashboard should update:

Selection_058

For example, in this case: mosquitto_pub -t /data -m 23

That’s it. It works fine with Mosquitto 1.4 with websockets enabled and with the Mosca MQTT broker.

Final setup:

We should make our dashboard permanent and for doing so we need to save the dashboard locally as a JSON file, and move it to the server running the dashboard to the root directory of freeboard.

After that we call directly the saved dashboard with the following URL:

http://myserveraddress/iot/index.html?load=dashboard.json

MQTT Publishing on the ESP8266

I’ve done some tests with esp8266 and NodeMcu Lua based firmware, and I’ve come to the following conclusions:

  • We can only publish again in a MQTT topic if, and only if the previous topic call has finish. That’s why we need to use a semaphore to stop multiple publish request at the same time.
  • MQTT based code consumes a lot of heap. So much, that to make it work I have to strip empty lines and comments from the code. It seems that the latest NodeMcu firmware does have a node.compile() function that we can use.
  • Using unbound arrays for storing MQTT messages while waiting for their turn to be publish, will end up using all available heap space, and the esp8266 will reboot, because setting previous array entries to nil won’t free the used space at all.

So I’ve now a sample code that uses a circular buffer to publish messages. The code below has some flaws, namely:

  • Doesn’t check if the buffer is full, it will just overwrite any older unpublished messages.
  • Entries on the buffer have no QOS defined.
  • Has no “intelligence” for example to only override messages with QOS=0 that might get lost anyway, and keep QOS>0 messages. That would mean that eventually the buffer might get full, and all publishing from the user code can fail.

Here is the code:

mqtt_cbuf.lua

-- Configuration to connect to the MQTT broker.
BROKER = "192.168.1.16"   -- Ip/hostname of MQTT broker
BRPORT = 1883             -- MQTT broker port
BRUSER = "user"           -- If MQTT authenitcation is used then define the user
BRPWD  = "pwd"            -- The above user password
CLIENTID = "ESP8266-" ..  node.chipid() -- The MQTT ID. Change to something you like
BFSIZE = 16

-- MQTT topics to subscribe
topics = {"topic1","topic2","topic3","topic4"} 

-- Control variables.
pub_sem = 0         
current_topic  = 1  
topicsub_delay = 50

-- Publishing structures
pub_topic = {}
pub_message = {}
pub_head = 1
pub_tail = 1

print("heap: " .. node.heap() )
m = mqtt.Client( CLIENTID, 120, BRUSER, BRPWD)

print "Connecting to MQTT broker. Please wait..."
print("heap: " .. node.heap() )
m:connect( BROKER , BRPORT, 0, function(conn)
     print("Connected to MQTT:" .. BROKER .. ":" .. BRPORT .." as " .. CLIENTID )
     mqtt_sub() 
end)

function mqtt_sub()
     if table.getn(topics) < current_topic then
          run_main_prog()
     else
          m:subscribe(topics[current_topic] , 0, function(conn)          
          end)
          current_topic = current_topic + 1  
          tmr.alarm(5, topicsub_delay, 0, mqtt_sub )
     end
end

function _publish_data()
   if pub_head ~= pub_tail then
     if pub_sem == 0 then
       pub_sem = 1
       m:publish( pub_topic[pub_head], pub_message[pub_head] ,0 ,0 , function(conn) 
          print("Sending to " .. pub_topic[pub_head] ..": " .. pub_message[pub_head])
          pub_sem = 0
          pub_topic[pub_head] = nil
          pub_message[pub_head] = nil
          pub_head = (pub_head + 1) % BFSIZE
       end)
     end  
   end
end

function publish_data(topic , message )
  pub_topic[pub_tail] = topic
  pub_message[pub_tail] = message
  pub_tail = (pub_tail + 1) % BFSIZE
  _publish_data()
end

function publish_data1()
 publish_data("Out1","Mensagem para out1" )
 publish_data("Out2","Mensagem para out2" )
end

function run_main_prog()
     tmr.alarm(2, 5000, 1, publish_data1 )
     m:on("message", function(conn, topic, data)
        print(topic .. ":" )
        if (data ~= nil ) then
          print ( data )
        end
      end )
end

Apparently this code can run for a long time.

Also at the beginning I print the heap space before running and connecting to a MQTT broker.

  • So after a reset: node.heap = 19600 ( with init.lua file)
  • Running the above file ( dofile(“mqtt_cbuf.lua”)): node.heap = 10560 after compiling and before connecting to the MQTT broker
  • After connecting: node.heap = 8136

If we use the new node.compile function with the function node.compile(“mqtt_cbuf.lua”) we should have now a new file named mqtt_cbuf.lc. So:

 

  • So after a reset: node.heap = 19472 ( with init.lua file)
  • Running the above file ( dofile(“mqtt_cbuf.lc”)): node.heap = 12160 after compiling and before connecting to the MQTT broker
  • After connecting: node.heap = 9720

A bit better. Not much, but makes a difference.

Since a small code as the above can fail to run due to lack of heap space, solving some of the issues that I’ve referred on the beginning of this post might only be feasible if really needed. Let’s see if I can improve this.

ESP8266: Logging data in a MySQL database

The following post shows how it is possible to store data that comes from the esp8266 into a MySQL database.
I assume that for this to be done, we have the following architecture:
– A LAMP server (Linux, Apache, MySQL and PHP) or a WAMP server (on Windows).
– ESP8266 running the NodeMCU firmware with LUA language that periodically calls the LAMP server and it passes the data on the HTTP request query string.

There are other ways of passing data, for example, using MQTT and with Node-Red storing data into MySQL, for example, but that is another story.

Creating the database:
Let’s create the database. Access the LAMP server, and has an user with enough privileges, execute the mysql client command line utility. Normally the user is root.

Create an user for accessing the database server:
mysql> create user ‘esp8266’@’localhost’ identified by ‘secretpwd';
Query OK, 0 rows affected (0.06 sec)

Create the database:
mysql> create database esp8266;
Query OK, 1 row affected (1.40 sec)

Grant the necessary privileges to access the database engine:
mysql> grant usage on *.* to ‘esp8266’@’localhost’ with max_queries_per_hour 10000;
Query OK, 0 rows affected (0.07 sec)

On the above statement, the ‘with max_queries_per_hour’ is optional, and it can be left out if there is no reason to limit resource access:
mysql> grant usage on *.* to ‘esp8266’@’localhost';
Query OK, 0 rows affected (0.02 sec)

And let’s make sure that the esp8266 mysql user can do whatever it needs on the esp8266 database:
mysql> grant all on esp8266.* to ‘esp8266’@’localhost';
Query OK, 0 rows affected (0.02 sec)

So now we can quit, and access the newly created database to create, in the case of this post, a single table for storing data. Let’s access the new database:

mysql -u esp8266 -psecretpwd esp8266

And create the table:

mysql> create table DataTable ( logdate DATETIME , field VARCHAR(64), value BIGINT);
Query OK, 0 rows affected (0.70 sec)

I’ve added a column named field so, that, if we want in the future, to query data based on field data.

Storing data:

Storing data is done through the following php page called store.php:

This code only stores if the query parameter on the request has a parameter named heap.

<?php
$servername = “localhost”;
$username = “esp8266″;
$password = “secretpwd”;
$dbname = “esp8266″;

$now = new DateTime();
parse_str( html_entity_decode( $_SERVER[‘QUERY_STRING’]) , $out);

if ( array_key_exists( ‘heap’ , $out ) ) {
// Create connection
$conn = new mysqli($servername, $username, $password, $dbname);
// Check connection
if ($conn->connect_error) {
die(“Connection failed: ” . $conn->connect_error);
}

$datenow = $now->format(‘Y-m-d H:i:s’);
$hvalue  = $out[‘heap’];

$sql = “INSERT INTO DataTable ( logdate , field  , value) VALUES ( ‘$datenow’ , ‘heap’, $hvalue )”;

if ($conn->query($sql) === TRUE) {
echo “New record created successfully”;
} else {
echo “Error: ” . $sql . “<br>” . $conn->error;
}

$conn->close();
}
?>

The esp8266 code:

The esp8266 code is based on code of some of my previous posts, but basically if we have WIFI connection the following code segment (stored on a file called ping.lua) is called periodically:


conn=net.createConnection(net.TCP, false)
conn:on("receive", function(conn, payload) print("Get done.", payload) end )
conn:connect(80,"xx.xx.xx.xx")
conn:send("GET /store.php?heap=" .. node.heap() .." HTTP/1.1\r\nHost: xx.xx.xx.xx\r\n" .. "Connection: keep-alive\r\nAccept: */*\r\n\r\n")

Viewing data:

We can view data stored on the MySQL database with the following code (list.php):

<?php
$servername = “localhost”;
$username = “esp8266″;
$password = “secretpwd”;
$dbname = “esp8266″;

parse_str( html_entity_decode( $_SERVER[‘QUERY_STRING’]) , $out);

$sqlq = “Select * from DataTable “;

if ( array_key_exists( ‘field’ , $out ) ) {
$sqlq = $sqlq . ” where field = ‘” . $out[‘field’] . “‘” ;
}

echo $sqlq;
// Create connection
$conn = new mysqli($servername, $username, $password, $dbname);
// Check connection
if ($conn->connect_error) {
die(“Connection failed: ” . $conn->connect_error);
}

$result = mysqli_query( $conn , $sqlq );

if ( $result->num_rows  > 0 ) {

echo “<table border=’1′>
<tr>
<th>Log Date</th>
<th>Field</th>
<th>Value</th>
</tr>”;

while($row = mysqli_fetch_array($result))
{
echo “<tr>”;
echo “<td>” . $row[‘logdate’] . “</td>”;
echo “<td>” . $row[‘field’] . “</td>”;
echo “<td>” . $row[‘value’] . “</td>”;
echo “</tr>”;
}
echo “</table>”;
}
$conn->close();

?>

If we call the list.php file without parameters, it will show all the available data. Otherwise it only shows what is specified on the field query parameter, for example: http://address/list.php?field=sensor1

Logstash: How to trigger an alarm or event

The ELK stack (ElasticSearch, LogStash and Kibana) is a great tool to centralize log monitoring. The following configuration is what I use for triggering an alarm or event when something that needs immediate attention happens on the log file.

The tip is quite easy to implement and is done by changing the configuration file for the logstash instance that gets data from the Redis database into the ElasticSearch.

For example, the base configuration file is something like this:

input {
redis {
host => “127.0.0.1”
# these settings should match the output of the agent
data_type => “list”
key => “logstash”
# We use the ‘json’ codec here because we expect to read
# json events from redis.
codec => json
}
}

output {
#stdout { debug => true debug_format => “json”}
elasticsearch {
host => “127.0.0.1”
}
}
This comes out straight from the Logstash documentation.

What if I want to do something when some event/string shows up on the log files?

All is needed is to change the output section to the following:

output {
#stdout { debug => true debug_format => “json”}

elasticsearch {
host => “127.0.0.1”
}

if “com.megacorp.security.exception.Exception” in [message]
 {
   exec {
     command => “/home/megacorp/monit/bin/notify.sh Error_message &”
   }
 }
}

So all is needed is to add the if clause, and the condition behaves as grep into the JSON message field.
So we can add this section several times to call any external command with exec, or send mail, or anything that logstash supports.  Just make sure that if using the exec plugin, the command has the & operator at the end to run on the background and not lock the logstash agent.
Also the [message] JSON component can be changed to anything that the JSON provides, like [tags] or [type].

 

ESP8266, NodeMcu and MQTT: Event publishing queueing

The MQTT code that I’ve copied and changed that is on the following post: https://primalcortex.wordpress.com/2015/02/06/nodemcu-and-mqtt-how-to-start/ works fine for subscribing topics as long that you don’t overload it: https://primalcortex.wordpress.com/2015/02/13/esp8266-nodemcu-and-mqtt-event-subscription-load-testing/

The code subscribes an topic array for receiving and this works, more or less fine. But it also creates a timer to call periodically a publish_data functions, for testing purposes. In reality imagine that we want to publish something when an Input pin changes value. We can do that with the trig function. For example:

gpio.trig(5, "down",publish_data1)
gpio.trig(4, "down",publish_data2)

Where publish_data# are functions that publish something to a MQTT topic. The problem is that it might happen that sometimes both triggers (two buttons pressed at the same time, for example) are activated and try to publish to a MQTT topic at the same time.
Let’s simulate this ( please note that this code is a section of the code in the MQTT Getting Started post.

-- Sample publish functions:
function publish_data(topic, message)
if pub_sem == 0 then
pub_sem = 1
m:publish( topic, message ,0 ,0 , function(conn)
print("Sending to " .. topic ..": " .. message)
pub_sem = 0
id1 = id1 +1
end)
end
end

function publish_data1()
publish_data("Out1","Mensagem para out1" )
publish_data("Out2","Mensagem para out2" )
end

In the above code sample, the Out2 topic message is never published because it takes time to the message for the previous topic to go out (Out1), and so the message gets lost. And this happens because we can’t call the m:publish for MQTT publishing before it has ended the previous call, hence the semaphore to avoid this.

So we are going to do some queueing for publishing:

I’m using an unbound Lua array, with two pointers, one for the Head, and another for the Tail.
Please note that this code is not perfect. The last message is only sent if adding a new message.
Also note that this simple code takes huge amounts of memory, even on “compile/interpretation” phase.
But it works for a while before crashing down with lack of memory.

-- Configuration to connect to the MQTT broker.
BROKER = "192.168.1.16" -- Ip/hostname of MQTT broker
BRPORT = 1883 -- MQTT broker port
BRUSER = "user" -- If MQTT authenitcation is used then define the user
BRPWD = "pwd" -- The above user password
CLIENTID = "ESP8266-" .. node.chipid() -- The MQTT ID. Change to something you like

-- MQTT topics to subscribe
topics = {"topic1","topic2","topic3","topic4"} -- Add/remove topics to the array

-- Control variables.
pub_sem = 0 -- MQTT Publish semaphore. Stops the publishing whne the previous hasn't ended
current_topic = 1 -- variable for one currently being subscribed to
topicsub_delay = 50 -- microseconds between subscription attempts, worked for me (local network) down to 5...YMMV

-- Publishing structures
pub_topic = {}
pub_message = {}
pub_head = 1
pub_tail = 1

-- connect to the broker
print("heap: " .. node.heap() )
m = mqtt.Client( CLIENTID, 120, BRUSER, BRPWD)

print "Connecting to MQTT broker. Please wait..."
print("heap: " .. node.heap() )
m:connect( BROKER , BRPORT, 0, function(conn)
print("Connected to MQTT:" .. BROKER .. ":" .. BRPORT .." as " .. CLIENTID )
mqtt_sub() --run the subscription function
end)

function mqtt_sub()
if table.getn(topics) < current_topic then
run_main_prog()
else
m:subscribe(topics[current_topic] , 0, function(conn)
end)
current_topic = current_topic + 1
tmr.alarm(5, topicsub_delay, 0, mqtt_sub )
end
end

function _publish_data()
if pub_head ~= pub_tail then
if pub_sem == 0 then
pub_sem = 1
m:publish( pub_topic[pub_head], pub_message[pub_head] ,0 ,0 , function(conn)
print("Sending to " .. pub_topic[pub_head] ..": " .. pub_message[pub_head])
pub_sem = 0
pub_topic[pub_head] = nil
pub_message[pub_head] = nil
pub_head = pub_head + 1
end)
end
end
end

function publish_data(topic , message )
pub_topic[pub_tail] = topic
pub_message[pub_tail] = message
pub_tail = pub_tail + 1
_publish_data()
end

function publish_data1()
publish_data("Out1","Mensagem para out1" )
publish_data("Out2","Mensagem para out2" )
end

function run_main_prog()
print("Main program")

tmr.alarm(2, 5000, 1, publish_data1 )

-- Callback to receive the subscribed topic messages.
m:on("message", function(conn, topic, data)
print(topic .. ":" )
if (data ~= nil ) then
print ( data )
end
end )
end

ESP8266, NodeMcu and MQTT: Event subscription load testing

I’ve being doing some testing with the latest NodeMcu regarding MQTT support. My experience is, more or less, documented here: https://primalcortex.wordpress.com/2015/02/06/nodemcu-and-mqtt-how-to-start/ .

I wanted to check if concurrent subscription seems to work fine, since I know that publishing is not straightforward. In the code from the above link, the esp8266 subscribes to four topics, named topic1, topic2 and so on.

This simple bash script, calls repeatedly the mosquitto broker publishing command:

#!/bin/bash
for i in {1..60}
do
/home/pcortex/01.Develop/mosquitto/mosquitto-1.3.5/client/mosquitto_pub -h 192.168.1.16 -m "Mensagem $i" -t topic1
/home/pcortex/01.Develop/mosquitto/mosquitto-1.3.5/client/mosquitto_pub -h 192.168.1.16 -m "Mensagem $i" -t topic2
done

But with low values on the loop count, the MQTT code on the esp8266 seems to work fine, but with higher values, as above (60), the esp8266 hangs and reboots. One interesting thing was that after some testing not even doing a reset or completely powering it off, I was able to bring the esp8266 back online. It just put out garbage on the serial output and that was it. Strange indeed.

So I’ve reflash it again with the latest NodeMcu version (lots of reflashing hey :) ), but still the esp8266 did not recover! Only garbage out when connecting to the serial port (And no, it wasn’t the serial port rate issue). So I’ve reflashed again the original AT Firmware, and it came back to life again, and then reflash it again with the latest Nodemcu version.

Adding a delay on the bash script, before the done line, like sleep 1 seems to keep the esp8266 running without issue.

So indeed this shows, that bombarding the esp8266 with MQTT messages in burst mode is not a good idea…

Nodemcu and MQTT: How to start

The latest nodemcu version has MQTT protocol support already built in. MQTT is one of the supporting protocols for building IoT, Internet of Things devices,

So I’ve done some tests using the new MQTT functions, with code based from this thread on ESP8266 forum http://www.esp8266.com/viewtopic.php?f=19&t=1278&start=40 ( My code is based on the helix user code. Thanks!). I’ve made some changes to make things work in a more robust way, that I’m documenting the on this post.

My initial tests where done with nodemcu version 20150123 but I’ll write this post based on version NodeMCU 0.9.5 build 20150127  so if your are using an older or newer version of the firmware, the code on this post can behave a bit different of what is expected. Also I’m using a MQTT broker named RSMB ( Real small message broker) running in my Synology NAS, but we can run any MQTT broker. Also for testing I’m using the great MQTT-SPY program.

The init.lua code that I use is as documented in this post: https://primalcortex.wordpress.com/2014/12/30/esp8266-nodemcu-and-lua-language-and-some-arduino-issues/ . That code makes sure that an WiFi connection is made before continuing.  The only difference is that on the init.lua code, namely the launch function, if a successful connection is made, instead of calling a ping.lua file repeatedly, a mqtt.lua file is called once. To do this just change the CMDFILE to be CMDFILE = “mqtt.lua”. Note that the timer is one shot now. It calls the mqtt.lua file and that’s it. Then the mqtt.lua file takes control.

CMDFILE = "mqtt.lua"   -- File that is executed after connection
-- Change the code of this function that it calls your code.
function launch()
  print("Connected to WIFI!")
  print("IP Address: " .. wifi.sta.getip())
  tmr.alarm(0, 1000, 0, function() dofile(CMDFILE) end )  -- Zero as third parameter. Call once the file.
end  -- !!! Increase the delay to like 10s if developing mqtt.lua file otherwise firmware reboot loops can happen

The MQTT API is used by basically creating an MQTT Client with mqtt.Client(), then connecting, and finally we can subscribe or post to topics. Subscriptions will call a callback function when data is received, and Publishing will also call a callback function when data is sent.

It also seems that only at one message at the same time can be publish, otherwise we get an error like this unprotected error in call to Lua API (pir.lua:34: sending in process) and the module reboots. I’ve solved the issue on a first approach by using a semaphore that blocks a new call before the older one is finished.

The following code uses two different functions to publish to a topic, do to the use of the publish semaphore. Please read  the code comments to follow the way the code works:

-- Configuration to connect to the MQTT broker.
BROKER = "192.168.1.16"   -- Ip/hostname of MQTT broker
BRPORT = 1883             -- MQTT broker port
BRUSER = "user"           -- If MQTT authenitcation is used then define the user
BRPWD  = "pwd"            -- The above user password
CLIENTID = "ESP8266-" ..  node.chipid() -- The MQTT ID. Change to something you like

-- MQTT topics to subscribe
topics = {"topic1","topic2","topic3","topic4"} -- Add/remove topics to the array

-- Control variables.
pub_sem = 0         -- MQTT Publish semaphore. Stops the publishing whne the previous hasn't ended
current_topic  = 1  -- variable for one currently being subscribed to
topicsub_delay = 50 -- microseconds between subscription attempts, worked for me (local network) down to 5...YMMV
id1 = 0
id2 = 0

-- connect to the broker
print "Connecting to MQTT broker. Please wait..."
m = mqtt.Client( CLIENTID, 120, BRUSER, BRPWD)
m:connect( BROKER , BRPORT, 0, function(conn)
     print("Connected to MQTT:" .. BROKER .. ":" .. BRPORT .." as " .. CLIENTID )
     mqtt_sub() --run the subscription function
end)

function mqtt_sub()
     if table.getn(topics) < current_topic then
          -- if we have subscribed to all topics in the array, run the main prog
          run_main_prog()
     else
          --subscribe to the topic
          m:subscribe(topics[current_topic] , 0, function(conn)
               print("Subscribing topic: " .. topics[current_topic - 1] )
          end)
          current_topic = current_topic + 1  -- Goto next topic
          --set the timer to rerun the loop as long there is topics to subscribe
          tmr.alarm(5, topicsub_delay, 0, mqtt_sub )
     end
end

-- Sample publish functions:
function publish_data1()
   if pub_sem == 0 then  -- Is the semaphore set=
     pub_sem = 1  -- Nop. Let's block it
     m:publish("temperature","hello",0,0, function(conn) 
        -- Callback function. We've sent the data
        print("Sending data1: " .. id1)
        pub_sem = 0  -- Unblock the semaphore
        id1 = id1 +1 -- Let's increase our counter
     end)
   end  
end

function publish_data2()
   if pub_sem == 0 then
     pub_sem = 1
     m:publish("ts","hello",0,0, function(conn) 
        print("Sending data2: " .. id2)
        pub_sem = 0
        id2 = id2 + 1
     end)
   end  
end

--main program to run after the subscriptions are done
function run_main_prog()
     print("Main program")
     
     tmr.alarm(2, 5000, 1, publish_data1 )
     tmr.alarm(3, 6000, 1, publish_data2 )
     -- Callback to receive the subscribed topic messages. 
     m:on("message", function(conn, topic, data)
        print(topic .. ":" )
        if (data ~= nil ) then
          print ( data )
        end
      end )
end

This seems to work fine for some time, but then, even with some free heap space, it stops working.  No idea why. Even the subscriptions fail to work, and at the broker I get a ending connection event due to time-out.

20150206 213132.389 CWNAN0033I Connection attempt to listener 1883 received from client ESP8266-10350678 on address 192.168.1.82:32456 
20150206 214010.033 CWNAN0024I 120 second keepalive timeout for client ESP8266-10350678, ending connection

When at this stage that seems locked the semaphore is “red”: pub_sem = 1…

Another interesting notes: if at the main function the timer 2 and 3 have the same interval,  publish_data2 is never called. For example:

     tmr.alarm(2, 5000, 1, publish_data1 )
     tmr.alarm(3, 5000, 1, publish_data2 )

Interesting… probably both timers are triggered at the same time and only the first one is honoured.

Second the way this is done, it means that for every topic a publish_data function must be built, and this will increase the semaphore demand.

I’ll keep this post short and end it here, but I’ll write a follow up post where the publish_data function is unified for any topic, and, I hope, we will be using queues for queueing the messages to be published.