As a follow up of part 1 of the Automatiko IoT and MQTT I'd like to take you further in exploration around workflows and IoT with MQTT.
This time we look at the details of how to take advantage of some MQTT features (e.g. wildcard topics), collect sensor data into a bucket (or to make this simple - a list) and then assign user tasks based on amount of data collected instead of for every event.
In addition to that, we look into Automatiko features that makes using workflows for IoT way easier...
Wildcard topics
Let's start with MQTT feature for subscribers - this is actually what workflow in our sample is - a MQTT topic subscriber. So let's first look like how does it work under the hood in Automatiko.
Automatiko uses message events (start or intermediate) of BPMN to integrate with message broker - in this case MQTT. Message events are referencing message which describes the way how it can connect to the broker
- by default uses name as the topic name
- data type of the message defines what type will be used to unmarshall incoming event into
- supports custom attributes to alter defaults
In this article we are going to use custom attributes to define both topic that will use wildcard and the correlation expression to extract information out of the topic instead of the message (as we did in part 1).
- topic
- correlationExpression
Topic is used to define the actual topic in MQTT that the workflow definition will be connected to and listening to incoming events.
Correlation expression on the other side defines an expression that will be used for each incoming event to extract a key to be used for both identification and correlation.
Correlation expression uses special function "topic" that accepts following parameters
- message - references the incoming message
- index - an index that will reference different parts of the topic - it starts with 0
So for this example topic(message, 1) and the topic home/kitchen/temp the extracted correlation key will be kitchen. This in turn will be used as identifier of the workflow instance and thus you can use it in the service api calls.
You can read up more on the messaging support in automatiko in documentation.
Data bucket to collect sensor data
Next topic for today is the collection of sensor data. In part one we simply assign it to a data object of the same type. This time we expand and make sure we can accumulate the data.
So with that said there are few things that must be done
- data object must be of type list
- data object of type list needs to be initialized so it can easily get new items
- message events received from MQTT need to be added to the list instead of assigned to data object (which would mean overridden)
Changing type is rather simple but ensuring it is initialized might be a bit more complex... but not with automatiko :) It is as simple as adding a tag on data object - auto-initialized
Configuring the message event to append to a list instead of assigning to the data object is also rather simple with automatiko, it is to use expression instead of direct mapping on the message event.
And that's it - we have now ready to use data bucket for our sensor data.
Decide when to include human actors
In this simple example we are going to receive events from MQTT and collect them into a bucket. But we don't want to involve human actors on each event. So here we can use gateways - a construct that allows us to have different paths in the workflow.
We only create a user task when the bucket has more than 5 events collected. Otherwise it simply end the path. But to prevent the workflow instance from finishing we make the workflow to be ad hoc - that allows it to stay active even though there are no active nodes in it. Marking workflow as ad hoc is done in the Properties -> Process panel.
See it in action
Have a look at this 10 min video showing all this in action. Note that this is live coding so you will see the errors as they might show up and you can imagine that this is not a scripted video where everything works from the start :)
Code can be found in github where each part of the series is a separate tag and main branch is pointing to latest version of the code base.
Conclusions
This part focused on bringing more advanced features of MQTT into the workflows to show the integration and how powerful these two can be. Using topic information as correlation, locating right workflow instances for incoming events and accumulating data is one of the most common use cases for IoT so workflows should make it simple to realize that.