ESP8266, NodeMcu and MQTT: Event publishing queueing

The MQTT code that I’ve copied and changed that is on the following post: https://primalcortex.wordpress.com/2015/02/06/nodemcu-and-mqtt-how-to-start/ works fine for subscribing topics as long that you don’t overload it: https://primalcortex.wordpress.com/2015/02/13/esp8266-nodemcu-and-mqtt-event-subscription-load-testing/

The code subscribes an topic array for receiving and this works, more or less fine. But it also creates a timer to call periodically a publish_data functions, for testing purposes. In reality imagine that we want to publish something when an Input pin changes value. We can do that with the trig function. For example:

gpio.trig(5, "down",publish_data1)
gpio.trig(4, "down",publish_data2)

Where publish_data# are functions that publish something to a MQTT topic. The problem is that it might happen that sometimes both triggers (two buttons pressed at the same time, for example) are activated and try to publish to a MQTT topic at the same time.
Let’s simulate this ( please note that this code is a section of the code in the MQTT Getting Started post.

-- Sample publish functions:
function publish_data(topic, message)
if pub_sem == 0 then
pub_sem = 1
m:publish( topic, message ,0 ,0 , function(conn)
print("Sending to " .. topic ..": " .. message)
pub_sem = 0
id1 = id1 +1
end)
end
end

function publish_data1()
publish_data("Out1","Mensagem para out1" )
publish_data("Out2","Mensagem para out2" )
end

In the above code sample, the Out2 topic message is never published because it takes time to the message for the previous topic to go out (Out1), and so the message gets lost. And this happens because we can’t call the m:publish for MQTT publishing before it has ended the previous call, hence the semaphore to avoid this.

So we are going to do some queueing for publishing:

I’m using an unbound Lua array, with two pointers, one for the Head, and another for the Tail.
Please note that this code is not perfect. The last message is only sent if adding a new message.
Also note that this simple code takes huge amounts of memory, even on “compile/interpretation” phase.
But it works for a while before crashing down with lack of memory.

-- Configuration to connect to the MQTT broker.
BROKER = "192.168.1.16" -- Ip/hostname of MQTT broker
BRPORT = 1883 -- MQTT broker port
BRUSER = "user" -- If MQTT authenitcation is used then define the user
BRPWD = "pwd" -- The above user password
CLIENTID = "ESP8266-" .. node.chipid() -- The MQTT ID. Change to something you like

-- MQTT topics to subscribe
topics = {"topic1","topic2","topic3","topic4"} -- Add/remove topics to the array

-- Control variables.
pub_sem = 0 -- MQTT Publish semaphore. Stops the publishing whne the previous hasn't ended
current_topic = 1 -- variable for one currently being subscribed to
topicsub_delay = 50 -- microseconds between subscription attempts, worked for me (local network) down to 5...YMMV

-- Publishing structures
pub_topic = {}
pub_message = {}
pub_head = 1
pub_tail = 1

-- connect to the broker
print("heap: " .. node.heap() )
m = mqtt.Client( CLIENTID, 120, BRUSER, BRPWD)

print "Connecting to MQTT broker. Please wait..."
print("heap: " .. node.heap() )
m:connect( BROKER , BRPORT, 0, function(conn)
print("Connected to MQTT:" .. BROKER .. ":" .. BRPORT .." as " .. CLIENTID )
mqtt_sub() --run the subscription function
end)

function mqtt_sub()
if table.getn(topics) < current_topic then
run_main_prog()
else
m:subscribe(topics[current_topic] , 0, function(conn)
end)
current_topic = current_topic + 1
tmr.alarm(5, topicsub_delay, 0, mqtt_sub )
end
end

function _publish_data()
if pub_head ~= pub_tail then
if pub_sem == 0 then
pub_sem = 1
m:publish( pub_topic[pub_head], pub_message[pub_head] ,0 ,0 , function(conn)
print("Sending to " .. pub_topic[pub_head] ..": " .. pub_message[pub_head])
pub_sem = 0
pub_topic[pub_head] = nil
pub_message[pub_head] = nil
pub_head = pub_head + 1
end)
end
end
end

function publish_data(topic , message )
pub_topic[pub_tail] = topic
pub_message[pub_tail] = message
pub_tail = pub_tail + 1
_publish_data()
end

function publish_data1()
publish_data("Out1","Mensagem para out1" )
publish_data("Out2","Mensagem para out2" )
end

function run_main_prog()
print("Main program")

tmr.alarm(2, 5000, 1, publish_data1 )

-- Callback to receive the subscribed topic messages.
m:on("message", function(conn, topic, data)
print(topic .. ":" )
if (data ~= nil ) then
print ( data )
end
end )
end

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s