ESP8266, NodeMcu and MQTT: Event publishing queueing

The MQTT code that I’ve copied and changed that is on the following post: works fine for subscribing topics as long that you don’t overload it:

The code subscribes an topic array for receiving and this works, more or less fine. But it also creates a timer to call periodically a publish_data functions, for testing purposes. In reality imagine that we want to publish something when an Input pin changes value. We can do that with the trig function. For example:

gpio.trig(5, "down",publish_data1)
gpio.trig(4, "down",publish_data2)

Where publish_data# are functions that publish something to a MQTT topic. The problem is that it might happen that sometimes both triggers (two buttons pressed at the same time, for example) are activated and try to publish to a MQTT topic at the same time.
Let’s simulate this ( please note that this code is a section of the code in the MQTT Getting Started post.

-- Sample publish functions:
function publish_data(topic, message)
if pub_sem == 0 then
pub_sem = 1
m:publish( topic, message ,0 ,0 , function(conn)
print("Sending to " .. topic ..": " .. message)
pub_sem = 0
id1 = id1 +1

function publish_data1()
publish_data("Out1","Mensagem para out1" )
publish_data("Out2","Mensagem para out2" )

In the above code sample, the Out2 topic message is never published because it takes time to the message for the previous topic to go out (Out1), and so the message gets lost. And this happens because we can’t call the m:publish for MQTT publishing before it has ended the previous call, hence the semaphore to avoid this.

So we are going to do some queueing for publishing:

I’m using an unbound Lua array, with two pointers, one for the Head, and another for the Tail.
Please note that this code is not perfect. The last message is only sent if adding a new message.
Also note that this simple code takes huge amounts of memory, even on “compile/interpretation” phase.
But it works for a while before crashing down with lack of memory.

-- Configuration to connect to the MQTT broker.
BROKER = "" -- Ip/hostname of MQTT broker
BRPORT = 1883 -- MQTT broker port
BRUSER = "user" -- If MQTT authenitcation is used then define the user
BRPWD = "pwd" -- The above user password
CLIENTID = "ESP8266-" .. node.chipid() -- The MQTT ID. Change to something you like

-- MQTT topics to subscribe
topics = {"topic1","topic2","topic3","topic4"} -- Add/remove topics to the array

-- Control variables.
pub_sem = 0 -- MQTT Publish semaphore. Stops the publishing whne the previous hasn't ended
current_topic = 1 -- variable for one currently being subscribed to
topicsub_delay = 50 -- microseconds between subscription attempts, worked for me (local network) down to 5...YMMV

-- Publishing structures
pub_topic = {}
pub_message = {}
pub_head = 1
pub_tail = 1

-- connect to the broker
print("heap: " .. node.heap() )
m = mqtt.Client( CLIENTID, 120, BRUSER, BRPWD)

print "Connecting to MQTT broker. Please wait..."
print("heap: " .. node.heap() )
m:connect( BROKER , BRPORT, 0, function(conn)
print("Connected to MQTT:" .. BROKER .. ":" .. BRPORT .." as " .. CLIENTID )
mqtt_sub() --run the subscription function

function mqtt_sub()
if table.getn(topics) < current_topic then
m:subscribe(topics[current_topic] , 0, function(conn)
current_topic = current_topic + 1
tmr.alarm(5, topicsub_delay, 0, mqtt_sub )

function _publish_data()
if pub_head ~= pub_tail then
if pub_sem == 0 then
pub_sem = 1
m:publish( pub_topic[pub_head], pub_message[pub_head] ,0 ,0 , function(conn)
print("Sending to " .. pub_topic[pub_head] ..": " .. pub_message[pub_head])
pub_sem = 0
pub_topic[pub_head] = nil
pub_message[pub_head] = nil
pub_head = pub_head + 1

function publish_data(topic , message )
pub_topic[pub_tail] = topic
pub_message[pub_tail] = message
pub_tail = pub_tail + 1

function publish_data1()
publish_data("Out1","Mensagem para out1" )
publish_data("Out2","Mensagem para out2" )

function run_main_prog()
print("Main program")

tmr.alarm(2, 5000, 1, publish_data1 )

-- Callback to receive the subscribed topic messages.
m:on("message", function(conn, topic, data)
print(topic .. ":" )
if (data ~= nil ) then
print ( data )
end )

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.