As a follow up of part 1 of the Automatiko IoT and MQTT I'd like to take you further in exploration around workflows and IoT with MQTT.
This time we look at the details of how to take advantage of some MQTT features (e.g. wildcard topics), collect sensor data into a bucket (or to make this simple - a list) and then assign user tasks based on amount of data collected instead of for every event.
In addition to that, we look into Automatiko features that makes using workflows for IoT way easier...
Wildcard topics
Let's start with MQTT feature for subscribers - this is actually what workflow in our sample is - a MQTT topic subscriber. So let's first look like how does it work under the hood in Automatiko.
Automatiko uses message events (start or intermediate) of BPMN to integrate with message broker - in this case MQTT. Message events are referencing message which describes the way how it can connect to the broker
by default uses name as the topic name
data type of the message defines what type will be used to unmarshall incoming event into
supports custom attributes to alter defaults
In this article we are going to use custom attributes to define both topic that will use wildcard and the correlation expression to extract information out of the topic instead of the message (as we did in part 1).
In the screenshot above you can see two custom attributes
topic
correlationExpression
Topic is used to define the actual topic in MQTT that the workflow definition will be connected to and listening to incoming events.
Correlation expression on the other side defines an expression that will be used for each incoming event to extract a key to be used for both identification and correlation.
Correlation expression uses special function "topic" that accepts following parameters
message - references the incoming message
index - an index that will reference different parts of the topic - it starts with 0
So for this example topic(message, 1) and the topic home/kitchen/temp the extracted correlation key will be kitchen. This in turn will be used as identifier of the workflow instance and thus you can use it in the service api calls.
You can read up more on the messaging support in automatiko in documentation.
Data bucket to collect sensor data
Next topic for today is the collection of sensor data. In part one we simply assign it to a data object of the same type. This time we expand and make sure we can accumulate the data.
So with that said there are few things that must be done
data object must be of type list
data object of type list needs to be initialized so it can easily get new items
message events received from MQTT need to be added to the list instead of assigned to data object (which would mean overridden)
Changing type is rather simple but ensuring it is initialized might be a bit more complex... but not with automatiko :) It is as simple as adding a tag on data object - auto-initialized
Configuring the message event to append to a list instead of assigning to the data object is also rather simple with automatiko, it is to use expression instead of direct mapping on the message event.
And that's it - we have now ready to use data bucket for our sensor data.
Decide when to include human actors
In this simple example we are going to receive events from MQTT and collect them into a bucket. But we don't want to involve human actors on each event. So here we can use gateways - a construct that allows us to have different paths in the workflow.
We only create a user task when the bucket has more than 5 events collected. Otherwise it simply end the path. But to prevent the workflow instance from finishing we make the workflow to be ad hoc - that allows it to stay active even though there are no active nodes in it. Marking workflow as ad hoc is done in the Properties -> Process panel.
See it in action
Have a look at this 10 min video showing all this in action. Note that this is live coding so you will see the errors as they might show up and you can imagine that this is not a scripted video where everything works from the start :)
Code can be found in github where each part of the series is a separate tag and main branch is pointing to latest version of the code base.
Conclusions
This part focused on bringing more advanced features of MQTT into the workflows to show the integration and how powerful these two can be. Using topic information as correlation, locating right workflow instances for incoming events and accumulating data is one of the most common use cases for IoT so workflows should make it simple to realize that.
Around a month ago there was the first release of Automatiko project. It aims at providing an easy to use yet powerful toolkit to build services and functions based on workflows and decision. You can read up more information about the Automatiko project at the website or blog.
This blog post is very basic introduction that attempts to address the first level of entry when starting with Automatiko.
Before you start...
So first things first... to get you up and running you need few things on your local computer
Java 11 +
Maven 3.6 +
Eclipse IDE and Automatiko plugin
Optionally docker or podman to run services as containers
The use case of this introductory sample is very simple - to connect to MQTT to receive data from the sensors that are being published there. It mainly focuses on the steps to get this working and further articles will provide more advanced features in action.
Let's get started
To get started you need to create a project, a maven project to be precise. Luckily Automatiko comes with bunch of maven archetypes that makes this task way faster. One of the is `automatiko-iot-archetype` and this is the one we are going to use today.
Archetype can be used from IDE or command line so whatever you prefer can be used and result will be exactly the same.
Once the project is there, you need to define your data model that will be used by the workflow - as data objects. In this example you can create simple POJO like class names `Temperature` that will have two fields
value - of type double that will contain the temperature value from the sensor
room - the location where the temperature was measured
Next step is to create a workflow definition that will be responsible for connection to MQTT broker. This is realised by message start event that allows not only to be the entry point for new instances but also allows to define some characteristics of the connectivity and processing.
All that is needed to talk to the MQTT broker is defined directly in the workflow definition. In our sample case, the topic in MQTT broker is simple taken from the message defined on the start event.
At the same time, data that is received from the MQTT topic is automatically converted to the data model - represented as 'Temperature` class and mapped to a `temp` data object inside the workflow instance.
Lastly, user task is also added to the workflow definition that introduces a human actor into the flow. That is mainly for demonstration purpose to show we receive data from MQTT that is properly converted into a Temperature class instance and set within the workflow instance data objects (aka variables).
A bit of advance...
As you will see when running this sample, ID of the workflow instance is auto generated (in UUID format). But that can be changed and take advantage of either data or the MQTT topic itself. By setting correlation or correlation expression on the message (via its custom attributes) you can set the id to be more domain specific.
In this case, we take the room from the message and use it as correlation. Correlation upon start of the workflow instance becomes its id - aka business key and by that it can be used in exchange with the generated ID. With that said you can use this when interacting with service API.
You can look at the introduction video that covers the content of this article.
Let's get this running...
To get this running we need a MQTT broker. Personally I find Mosquitto to be excellent choice but any MQTT compliant broker would work.
You can get Mosqiutto running with just a simple command (docker required)
docker run -it -p 1883:1883 -p 9001:9001 eclipse-mosquitto
There are other ways to run mosquitto so visit website if docker is not an option.
Run the Automatiko service
Since Automatiko generates fully functional service there is no much to do to see it in action. It is based on maven and leverages Quarkus as runtime so most Quarkus features are also available out of the box e.g. dev mode, Dev UI etc.
mvn clean quarkus:dev
and then wait a bit for maven to download all the bits....
Once it's started you should see similar log entries
You can head directly to http://localhost:8080/swagger-ui and you will be presented with nicely document API of your service
So it is now fully functional service connected to your local MQTT broker. With this you can start publishing sensor data to it and see how quickly they are consumed by Automatiko service.
You can use mosquitto client command to publish messages from command line
mosquitto_pub -t temperature -m '{"room":"livingroom", "value" : 25.0}'
use Swagger UI to see created instances based on incoming messages from MQTT.
Running as container
In case you would like to get this running as container then it is again dead simple, just run maven command with `container` profile enabled
mvn clean package -Pcontainer
And that's it. As soon as build is over you will have the container image in your local registry.
Conclusion
That's it for the first introduction - I hope it will get you interested and you will look forward for the next articles. They will come ever other week... at least that's the plan. Please share your feedback either here or via mailing list and twitter.
If you have any ideas for the use cases to cover with Automatiko don't hesitate to let us know about them as we are here to help and explore.