2016/01/26

Advanced queries in KIE Server

As a follow up of Advanced queries in jBPM 6.4 article let's take a look at queries in KIE Server - BPM capability.
Since KIE Server's BPM capability is based on jbpm services api, it does provide access to QueryService and its advanced (DashBuilder DataSets based) operations.

We are going to use the same use case, product sale with 10 000 loaded process and task instances. Next we show how you can query data both via KIE Server client and directly via raw REST api.

KIE Server capabilities when it comes to advanced queries mirrors what's available in services api, so users can:

  • register query definitions
  • replace  query definitions
  • unregister query definitions
  • get list of queries or individual query definition
  • execute queries on top of query definitions with 
    • paging and sorting
    • filter parameters
    • query with custom param builder and mappers
So let's start simple and build our KIE Server client to use query services:

KieServicesConfiguration configuration = KieServicesFactory.newRestConfiguration(serverUrl, user, password);

Set<Class<?>> extraClasses = new HashSet<Class<?>>();
extraClasses.add(Date.class); // for JSON only to properly map dates
        
configuration.setMarshallingFormat(MarshallingFormat.JSON);
configuration.addJaxbClasses(extraClasses);
        
KieServicesClient kieServicesClient =  KieServicesFactory.newKieServicesClient(configuration);
        
QueryServicesClient queryClient = kieServicesClient.getServicesClient(QueryServicesClient.class);

now we are ready to make use of the query service via QueryServicesClient

List available query definitions available in the system

List<QueryDefinition> queryDefs = queryClient.getQueries(0, 10);
System.out.println(queryDefs);

Next let's register new query definition that we can use for advanced queries

QueryDefinition query = new QueryDefinition();
query.setName("getAllTaskInstancesWithCustomVariables");
query.setSource("java:jboss/datasources/ExampleDS");
query.setExpression("select ti.*,  c.country, c.productCode, c.quantity, c.price, c.saleDate " +
                       "from AuditTaskImpl ti " +
                       "    inner join (select mv.map_var_id, mv.taskid from MappedVariable mv) mv " +
                       "      on (mv.taskid = ti.taskId) " +
                       "    inner join ProductSale c " +
                       "      on (c.id = mv.map_var_id)");
        
queryClient.registerQuery(query);

Once the query is registered with can make use of it and start fetching data. At first very basic query:

List<TaskInstance> tasks = queryClient.query("getAllTaskInstancesWithCustomVariables", "UserTasks", 0, 10, TaskInstance.class);
System.out.println(tasks);

this will return task instances directly from the data set without any filtering and use UserTasks mapper to build up object representation and apply paging - first page and 10 results at most.

Now it's time to use more advanced queries capabilities and start filtering by process variables. As described in the Advanced queries in jBPM 6.4 article to be able to map custom variables we need to provide their column mapping - name and type. Following is an example that searches for tasks that:

  • processInstanceId is between 1000 and 2000 - number range condition
  • price is over 800 - number comparison condition
  • sale date is between 01.02.2016 and 01.03.2016 - date range condition
  • product in sale are EAP or Wildfly - logical and group condition
  • order descending by saleDate and country

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
              
Date from = sdf.parse("2016-02-01");                        
Date to = sdf.parse("2016-03-01");
        
QueryFilterSpec spec = new QueryFilterSpecBuilder()
   .between("processInstanceId", 1000, 2000)
   .greaterThan("price", 800)
   .between("saleDate", from, to)
   .in("productCode", Arrays.asList("EAP", "WILDFLY"))
   .oderBy("saleDate, country", false)
 .addColumnMapping("COUNTRY", "string")
 .addColumnMapping("PRODUCTCODE", "string")
 .addColumnMapping("QUANTITY", "integer")
 .addColumnMapping("PRICE", "double")
 .addColumnMapping("SALEDATE", "date")
.get();        
        
List<TaskInstance> tasks = queryClient.query("getAllTaskInstancesWithCustomVariables", "UserTasksWithCustomVariables", spec, 0, 10, TaskInstance.class);
System.out.println(tasks);

The query in above example uses QueryFilterSpec (and its builder) that allows to specify query parameters and sorting options. In addition it allows to specify column mapping for custom elements to be set as variables next to default column for task details. These column mappings are then delivered to mapper for transforming results - in this case we used built in mapper UserTasksWithCustomVariables that will collect all data details and given column mappings as custom variables data.

QueryFilterSpec maps to use of QueryParams in services api so in inherits the same limitation - all conditions are AND based and thus means all must match to get a hit.

To overcome the problem, services api introduced QueryParamBuilder so users can build advanced filters. Similar is on KIE Server, though they need to be built and included in one of following:

  • KIE Server itself (like in WEB-INF/lib)
  • Inside a project - kjar
  • Inside a project's dependency
Implementing QueryParamBuilder to be used in KIE Server requires a factory so it can be discovered and created on query time - every time query is issues new instance of QueryParamBuilder will be requested with given parameters.

Using QueryParamBuilder in KIE Server

To be able to use QueryParamBuilder user needs to:
  • Implement QueryParamBuilder that will produce new instance every time is requested and given a map of parameters

public class TestQueryParamBuilder implements QueryParamBuilder<ColumnFilter> {

    private Map<String, Object> parameters;
    private boolean built = false;
    public TestQueryParamBuilder(Map<String, Object> parameters) {
        this.parameters = parameters;
    }
    
    @Override
    public ColumnFilter build() {
        // return null if it was already invoked
        if (built) {
            return null;
        }
        
        String columnName = "processInstanceId";
        
        ColumnFilter filter = FilterFactory.OR(
                FilterFactory.greaterOrEqualsTo(((Number)parameters.get("min")).longValue()),
                FilterFactory.lowerOrEqualsTo(((Number)parameters.get("max")).longValue()));
        filter.setColumnId(columnName);
       
        built = true;
        return filter;
    }

}
Above builder will produce filter that will accept processInstanceId that are grater that min or lower that max. Where min and max are given on each query issued as part of the request.
  • Implement QueryParamBuilderFactory 
public class TestQueryParamBuilderFactory implements QueryParamBuilderFactory {

    @Override
    public boolean accept(String identifier) {
        if ("test".equalsIgnoreCase(identifier)) {
            return true;
        }
        return false;
    }

    @Override
    public QueryParamBuilder newInstance(Map<String, Object> parameters) {
        return new TestQueryParamBuilder(parameters);
    }

}
Factory is responsible for returning new instances of the query param builder only if the given identifier is accepted by the factory. Identifier is given as part of query request and there can be only one query builder factory selected based on the identifier. In this case "test" identifier needs to be given to use this factory, and in turn query param builder.

There is last tiny bit required to make this to work - we need to make it discoverable so let's add service file into META-INF folder of the jar that will package these implementation.

META-INF/services/org.jbpm.services.api.query.QueryParamBuilderFactory

where the content of this file is fully qualified class name of the factory.

with this we can issue a request that will make use of newly created query builder for advanced filters:

Map<String, Object> params = new HashMaplt;String, Object>();
params.put("min", 10);
params.put("max", 20);

Listlt;TaskInstance> instances = queryClient.query("getAllTaskInstancesWithCustomVariables", "UserTasksWithCustomVariables", "test", params, 0, 10, TaskInstance.class);
So what we have done here:

  • reference registered query by name - getAllTaskInstancesWithCustomVariables
  • reference mapper by name - UserTasksWithCustomVariables
  • reference query param builder identifier - test
  • sent params (min and max) that will be used by new instance of query builder before query is executed

Similar to this you can register and use custom mappers and it is even simpler than query param builders as there is no need for factory as services api comes with registry that KIE Server uses to register found mappers by ServiceLoader based discovery.

Implement mapper so it can be used in KIE Server:


public class ProductSaleQueryMapper extends UserTaskInstanceWithCustomVarsQueryMapper {

    private static final long serialVersionUID = 3299692663640707607L;

    public ProductSaleQueryMapper() {
        super(getVariableMapping());
    }

    protected static Map<String, String> getVariableMapping() {
        Map<String, String> variablesMap = new HashMap<String, String>();
        
        variablesMap.put("COUNTRY", "string");
        variablesMap.put("PRODUCTCODE", "string");
        variablesMap.put("QUANTITY", "integer");
        variablesMap.put("PRICE", "double");
        variablesMap.put("SALEDATE", "date");
        
        return variablesMap;
    }

    @Override
    public String getName() {
        return "ProductSale";
    }
}

Here we simply extend the UserTaskInstanceWithCustomVarsQueryMapper and provide directly column mapping so it can be used without column mapping on request level. To be able to use it, mapper needs to be made discoverable so we need to create service file within META-INF folder of the jar that will package this implementation.

META-INF/services/org.jbpm.services.api.query.QueryResultMapper

where the content of this file is fully qualified class name of the mapper.

Now we can directly use it by referencing it by name:

List<TaskInstance> tasks = queryClient.query("getAllTaskInstancesWithCustomVariables", "ProductSale", 0, 10, TaskInstance.class);
System.out.println(tasks);

Raw REST API use of described examples

Get query definitions
Endpoint:
  • http://localhost:8230/kie-server/services/rest/server/queries/definitions?page=0&pageSize=10
Method:
  • GET

Register query definition
Endpoint:
  • http://localhost:8230/kie-server/services/rest/server/queries/definitions/getAllTaskInstancesWithCustomVariables
Method:
  • POST
Request body:
{
  "query-name" : "getAllTaskInstancesWithCustomVariables1",
  "query-source" : "java:jboss/datasources/ExampleDS",
  "query-expression" : "select ti.*,  c.country, c.productCode, c.quantity, c.price, c.saleDate from AuditTaskImpl ti     inner join (select mv.map_var_id, mv.taskid from MappedVariable mv) mv       on (mv.taskid = ti.taskId)     inner join ProductSale c       on (c.id = mv.map_var_id)",
  "query-target" : "CUSTOM"

}

Query for tasks - no filtering
Endpoint:
  • http://localhost:8230/kie-server/services/rest/server/queries/definitions/getAllTaskInstancesWithCustomVariables/data?mapper=UserTasks&orderBy=&page=0&pageSize=10
Method:
  • GET


Query with filter spec
Endpoint:
  • http://localhost:8230/kie-server/services/rest/server/queries/definitions/getAllTaskInstancesWithCustomVariables/filtered-data?mapper=UserTasksWithCustomVariables&page=0&pageSize=10
Method:
  • POST
Request body:
{
  "order-by" : "saleDate, country",
  "order-asc" : false,
  "query-params" : [ {
    "cond-column" : "processInstanceId",
    "cond-operator" : "BETWEEN",
    "cond-values" : [ 1000, 2000 ]
  }, {
    "cond-column" : "price",
    "cond-operator" : "GREATER_THAN",
    "cond-values" : [ 800 ]
  }, {
    "cond-column" : "saleDate",
    "cond-operator" : "BETWEEN",
    "cond-values" : [ {"java.util.Date":1454281200000}, {"java.util.Date":1456786800000} ]
  }, {
    "cond-column" : "productCode",
    "cond-operator" : "IN",
    "cond-values" : [ "EAP", "WILDFLY" ]
  } ],
  "result-column-mapping" : {
    "PRICE" : "double",
    "PRODUCTCODE" : "string",
    "COUNTRY" : "string",
    "SALEDATE" : "date",
    "QUANTITY" : "integer"
  }
}

Query with custom query param builder
Endpoint:
  • http://localhost:8230/kie-server/services/rest/server/queries/definitions/getAllTaskInstancesWithCustomVariables/filtered-data?mapper=UserTasksWithCustomVariables&builder=test&page=0&pageSize=10
Method:
  • POST
Request body:
{
  "min" : 10,
  "max" : 20
}

Query for tasks - custom mapper
Endpoint:
  • http://localhost:8230/kie-server/services/rest/server/queries/definitions/getAllTaskInstancesWithCustomVariables/data?mapper=ProductSale&orderBy=&page=0&pageSize=10
Method:
  • GET

With this, we have went over support for advanced queries in KIE Server for BPM capability.

As usual, feedback is welcome :)



Advanced queries in jBPM 6.4

While working with BPM, access to data that are being processed by the engine are very important. In many cases users would like to have options to easily and efficiently search for different data:

  • process instances started by...
  • process instances not completed until...
  • tasks assigned to ... for a given project
  • tasks not started for a given amount of time
  • process instances with given process variable(s)
  • tasks with given task variable(s)
These are just few examples of advanced queries that are useful but might be tricky to provide out of the box because:
  • different data bases have different capabilities when it comes to efficient searches
  • ORM in between adds layer of complexity while it helps to mitigate db differences
  • out of the box solution relies on compile time data - that can be used in queries - like jpa entities
  • not possible to build data structure that will fit all cases and that will be efficient to query on

Again, just few items that makes the query out of the box limited in terms of functionality. jBPM in version 6.3 comes with efficient query builders based on JPA Criteria API that aims at solving many issues that are listed above but is blocked by compile time dependency as this is JPA based solution so the entity manager must be aware of all possible types used in queries.

What's new in 6.4?

jBPM 6.4 comes with solution to address these problems. And this solution is based on DashBuilder DataSets. DataSets are like data base views - users can define them to pre filter and aggregate data before they will be queried or filtered if you like.

QueryService is part of jbpm services api - a cross framework api build to simplify usage of jBPM in embedded use case. At the same time jbpm services api is backbone of both KIE workbench and KIE Server (with its BPM capabilities).

QueryService exposes simple yet powerful set of operations:
  • Management operations
    • register query definition
    • replace query definition
    • unregister query definition
    • get query definition
    • get queries
  • Runtime operations
    • query - with two flavors:
      • simple based on QueryParam as filter provider
      • advanced based on QueryParamBuilder as filter provider 
DashBuilder DataSets provide support for multiple data sources (CSV, SQL, elastic search, etc) while jBPM - since its backend is RDBMS based - focuses on SQL based data sets. So jBPM QueryService is a subset of DashBuilder DataSets capabilities to allow efficient queries with simple API.

How to use it?

Let's define use case that we can use throughout this article...
We are about to sale software and for doing that we define very simple process that deal with the sale operation. For that we have data model defined that represents our produce sale:
ProductSale:
   String productCode
    String country
    Double price
    Integer quantity
    Date saleDate



As you can see the process is very simple but aims at doing few important things:
  • make use of both processes and user tasks
  • deals with custom data model as process and user task
  • allows to store externally process and task variables (here as JPA entity)
To be able to take advantage of the advanced queries we need to make sure we have various data being processed by jBPM so we can actually measure properly how easy we can find the relevant data. For that we create 10 000 process instances (and by that 10 000 user tasks) that we can then try to search for using different criteria.

Define query definitions

First thing user needs to do is to define data set - view of the data you want to work with - so called QueryDefinition in services api. 
 
SqlQueryDefinition query = new SqlQueryDefinition("getAllProcessInstances", "java:jboss/datasources/ExampleDS");
query.setExpression("select * from processinstancelog");
        
queryService.registerQuery(query);

This is the simplest possible query definition as it can be:

  • constructor takes 
    • a unique name that identifies it on runtime
    • data source JNDI name used when performing queries on this definition - in other words source of data
  • expression - the most important part - is the sql statement that builds up the view to be filtered when performing queries
Once we have the sql query definition we can register it so it can be used later for actual queries.

Perform basic queries

Next make use of it by using queryService.query methods:

Collection<ProcessInstanceDesc> instances = queryService.query("getAllProcessInstances", ProcessInstanceQueryMapper.get(), new QueryContext());

What happened here...

  • we referenced the registered query by name - getAllProcessInstances
  • we provided ProcessInstanceQueryMapper that will be responsible for mapping our data to object instances
  • we provided default query context that enables paging and sorting
Let's see it with query context configuration...

QueryContext ctx = new QueryContext(0, 100, "start_date", true);
        
Collection<ProcessInstanceDesc> instances = queryService.query("getAllProcessInstances", ProcessInstanceQueryMapper.get(), ctx);

here we search the same query definition (data set) but we want to get 100 results starting at 0 and we want to have it with ascending order by start date.

But that's not advanced at all... it just doing paging and sorting on single table... so let's add filtering to the mix.

// single filter param
Collection<ProcessInstanceDesc> instances = queryService.query("getAllProcessInstances", ProcessInstanceQueryMapper.get(), new QueryContext(), QueryParam.likeTo(COLUMN_PROCESSID, true, "org.jbpm%"));

// multiple filter params (AND)
Collection<ProcessInstanceDesc> instances = queryService.query("getAllProcessInstances", ProcessInstanceQueryMapper.get(), new QueryContext(),
 QueryParam.likeTo(COLUMN_PROCESSID, true, "org.jbpm%"),
 QueryParam.equalsTo(COLUMN_STATUS, 1, 3));

here we have filtered our data set:

  • first query - by process id that matches "org.jbpm%"
  • second query - by process id that matches "org.jbpm%" and status is in active or aborted

but that's still not very advanced, isn't it?? Let's look at how to work with variables.

Perform queries with process and task variables

Common use case is to find process instances or tasks that have given variable or have given variable with particular value.

jBPM from version 6.4 indexes task variables (and in previous versions it already did that for process instance variables) in data base. The indexation mechanism is configurable but default is to simple toString on the variable and keep it in single table:

  • Process instance variables - VariableInstanceLog table
  • Task variables - TaskVariableImpl table
equipped with this information we can define data sets that will allow us to query for task and process variables.

// process instances with variables
SqlQueryDefinition query = new SqlQueryDefinition("getAllProcessInstancesWithVariables", "java:jboss/datasources/ExampleDS");
query.setExpression("select pil.*, v.variableId, v.value " +
                       "from ProcessInstanceLog pil " +
                            "INNER JOIN (select vil.processInstanceId ,vil.variableId, MAX(vil.ID) maxvilid  FROM VariableInstanceLog vil " +
                                "GROUP BY vil.processInstanceId, vil.variableId ORDER BY vil.processInstanceId)  x " +
                                "ON (v.variableId = x.variableId  AND v.id = x.maxvilid )" +
                            "INNER JOIN VariableInstanceLog v " +        
                                "ON (v.processInstanceId = pil.processInstanceId)");
        
queryService.registerQuery(query);

// tasks with variables
query = new SqlQueryDefinition("getAllTaskInputInstancesWithVariables", "java:jboss/datasources/ExampleDS");
query.setExpression("select ti.*, tv.name tvname, tv.value tvvalue "+
                        "from AuditTaskImpl ti " +
                             "inner join (select tv.taskId, tv.name, tv.value from TaskVariableImpl tv where tv.type = 0 ) tv "+
                                "on (tv.taskId = ti.taskId)");
        
queryService.registerQuery(query);

now we have registered new query definitions that will allow us to search for process and task and return variables as part of the query.

NOTE: usually when defining query definitions we don't want to have always data set to be same as the source tables so it's good practice to initially narrow down the amount of data for example by defining it for given project (deploymentId) or process id etc. Keep in mind that you can have query definitions as many as you like.

Now it's time to make use of these queries to fetch some results

Get process instances with variables:

List<ProcessInstanceWithVarsDesc> processInstanceLogs = queryService.query("getAllProcessInstancesWithVariables", ProcessInstanceWithVarsQueryMapper.get(), new QueryContext(), QueryParam.equalsTo(COLUMN_VAR_NAME, "approval_document"));

So we are able to find process instances that have variable called 'approval_document'...

Get tasks with variables:

List<UserTaskInstanceWithVarsDesc> taskInstanceLogs = queryService.query("getAllTaskInputInstancesWithVariables", UserTaskInstanceWithVarsQueryMapper.get(), new QueryContext(), 
                                                            QueryParam.equalsTo(COLUMN_TASK_VAR_NAME, "Comment"), 
                                                            QueryParam.equalsTo(COLUMN_TASK_VAR_VALUE, "Write a Document"));

... and here we can find tasks that have task variable 'Comment' and with value 'Write a Document'.

So a bit of a progress with more advanced queries but still nothing that couldn't be done with out of the box queries. Main limitation with out of the box variables indexes is that they are always stored as string and thus cannot be efficiently compared on db side like using operators >, < between, etc

... but wait with query definitions you can take advantage of the SQL being used to create your data view and by that use data base specific functions that can cast or convert string into different types of data. With this you can tune the query definition to provide you with subset of data with converted types. But of course that comes with performance penalty depending on the conversion type and amount of data.

So another level of making this use case covered is to externalize process and task variables (at least some of them that shall be queryable) and keep them in separate table(s). jBPM comes with so called pluggable variable persistence strategies and ships out of the box JPA based one. So you can create your process variable as entity and thus it will be stored in separate table. You can then take advantage of mapping support (org.drools.persistence.jpa.marshaller.VariableEntity) that ensures that mapping between your entity and process instance/task will be maintained.

Here is sample ProductSale object that is defined as JPA entity and will be stored in separate table

@javax.persistence.Entity
public class ProductSale extends org.drools.persistence.jpa.marshaller.VariableEntity implements java.io.Serializable
{

   static final long serialVersionUID = 1L;

   @javax.persistence.GeneratedValue(strategy = javax.persistence.GenerationType.AUTO, generator = "PRODUCTSALE_ID_GENERATOR")
   @javax.persistence.Id
   @javax.persistence.SequenceGenerator(name = "PRODUCTSALE_ID_GENERATOR", sequenceName = "PRODUCTSALE_ID_SEQ")
   private java.lang.Long id;

   private java.lang.String productCode;

   private java.lang.String country;

   private java.lang.Double price;

   private java.lang.Integer quantity;

   private java.util.Date saleDate;

   public ProductSale()
   {
   }

   public java.lang.Long getId()
   {
      return this.id;
   }

   public void setId(java.lang.Long id)
   {
      this.id = id;
   }

   public java.lang.String getProductCode()
   {
      return this.productCode;
   }

   public void setProductCode(java.lang.String productCode)
   {
      this.productCode = productCode;
   }

   public java.lang.String getCountry()
   {
      return this.country;
   }

   public void setCountry(java.lang.String country)
   {
      this.country = country;
   }

   public java.lang.Double getPrice()
   {
      return this.price;
   }

   public void setPrice(java.lang.Double price)
   {
      this.price = price;
   }

   public java.lang.Integer getQuantity()
   {
      return this.quantity;
   }

   public void setQuantity(java.lang.Integer quantity)
   {
      this.quantity = quantity;
   }

   public java.util.Date getSaleDate()
   {
      return this.saleDate;
   }

   public void setSaleDate(java.util.Date saleDate)
   {
      this.saleDate = saleDate;
   }
}

When such entity is then used as process or task variable it will be stored in productsale table and referenced as mapping in mappedvariable table so it can be joined to find process or task instances holding that variable.

Here we can make use of different types of data in that entity - string, integer, double, date, long and by that make use of various type aware operators to filter efficiently data. So let's define another data set that will provide use with tasks that can be filtered by product sale details.

// tasks with custom variable information
SqlQueryDefinition query = new SqlQueryDefinition("getAllTaskInstancesWithCustomVariables", "java:jboss/datasources/ExampleDS");
query.setExpression("select ti.*,  c.country, c.productCode, c.quantity, c.price, c.saleDate " +
                       "from AuditTaskImpl ti " +
                       "    inner join (select mv.map_var_id, mv.taskid from MappedVariable mv) mv " +
                       "      on (mv.taskid = ti.taskId) " +
                       "    inner join ProductSale c " +
                       "      on (c.id = mv.map_var_id)");
        
queryService.registerQuery(query);

// tasks with custom variable information with assignment filter
SqlQueryDefinition queryTPO = new SqlQueryDefinition("getMyTaskInstancesWithCustomVariables", "java:jboss/datasources/ExampleDS", Target.PO_TASK);
queryTPO.setExpression("select ti.*,  c.country, c.productCode, c.quantity, c.price, c.saleDate, oe.id oeid " +
                            "from AuditTaskImpl ti " +
                            "    inner join (select mv.map_var_id, mv.taskid from MappedVariable mv) mv " +
                            "      on (mv.taskid = ti.taskId) " +
                            "    inner join ProductSale c " +
                            "      on (c.id = mv.map_var_id), " +
                            "  PeopleAssignments_PotOwners po, OrganizationalEntity oe " +
                            "    where ti.taskId = po.task_id and po.entity_id = oe.id");
        
queryService.registerQuery(queryTPO);

here we registered two additional query definitions:

  • first to load into data set both task info and product sale info
  • second same as first but joined with potential owner information to get tasks only for authorized users
In second query you can notice third parameter in the constructor which defines the target - this is mainly to instruct QueryService to apply default filters like user or group filter for potential. Same filter parameters can be set manually so it's just short cut given by the API.



Marked in blue are variables from custom table and in orange task details

Now we can perform queries that will benefit from externally stored variable information to be able to find tasks by various properties (of different types) using various operators

Map<String, String> variableMap = new HashMap<String, String>();
variableMap.put("COUNTRY", "string");
variableMap.put("PRODUCTCODE", "string");
variableMap.put("QUANTITY", "integer");
variableMap.put("PRICE", "double");
variableMap.put("SALEDATE", "date");

//let's find tasks for product EAP and country Brazil and tasks with status Ready and Reserved");
List<UserTaskInstanceWithVarsDesc> taskInstanceLogs = queryService.query(query.getName(), 
                UserTaskInstanceWithCustomVarsQueryMapper.get(variableMap), new QueryContext(), 
                QueryParam.equalsTo("productCode", "EAP"), 
                QueryParam.equalsTo("country", "Brazil"), 
                QueryParam.in("status", Arrays.asList(Status.Ready.toString(), Status.Reserved.toString())));


// now let's search for tasks that are for EAP and sales data between beginning and end of February
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
              
Date from = sdf.parse("2016-02-01");                        
Date to = sdf.parse("2016-03-01");
taskInstanceLogs = queryService.query(query.getName(), 
                UserTaskInstanceWithCustomVarsQueryMapper.get(variableMap), new QueryContext(), 
                QueryParam.equalsTo("productCode", "EAP"), 
                QueryParam.between("saleDate", from, to),
                QueryParam.in("status", Arrays.asList(Status.Ready.toString(), Status.Reserved.toString())));


Here you can see how easy and efficient queries can be using variables stored externally. You can take advantage of type based operators to effectively narrow down the results.

As you might have noticed, this time we use another type of mapper - UserTaskInstanceWithCustomVarsQueryMapper - that is responsible for mapping both task information and custom variable. Thus we need to provide column mapping - name and type - so mapper know how to read data from data base to preserve the actual type.

Mappers are rather powerful and thus are pluggable, you can implement your own mappers that will transform the result into whatever type you like. jBPM comes with following mappers out of the box:

  • org.jbpm.kie.services.impl.query.mapper.ProcessInstanceQueryMapper
    • registered with name - ProcessInstances
  • org.jbpm.kie.services.impl.query.mapper.ProcessInstanceWithVarsQueryMapper
    • registered with name - ProcessInstancesWithVariables
  • org.jbpm.kie.services.impl.query.mapper.ProcessInstanceWithCustomVarsQueryMapper
    • registered with name - ProcessInstancesWithCustomVariables
  • org.jbpm.kie.services.impl.query.mapper.UserTaskInstanceQueryMapper
    • registered with name - UserTasks
  • org.jbpm.kie.services.impl.query.mapper.UserTaskInstanceWithVarsQueryMapper
    • registered with name - UserTasksWithVariables
  • org.jbpm.kie.services.impl.query.mapper.UserTaskInstanceWithCustomVarsQueryMapper
    • registered with name - UserTasksWithCustomVariables
  • org.jbpm.kie.services.impl.query.mapper.TaskSummaryQueryMapper
    • registered with name - TaskSummaries


Mappers are registered by name to simplify lookup of them and to avoid compile time dependency to actual mapper implementation. Instead you can use:

org.jbpm.services.api.query.NamedQueryMapper

that simple expects the name of the actual mapper that will be resolved on time when the query is performed

Here you can find complete product-sale project that can be imported into KIE Workbench for inspection and customization.

QueryParamBuilder

Last but not least is the QueryParamBuilder that provides more advanced way of building filters for our data sets. By default when using query method of QueryService that accepts zero or more QueryParam instances (as we have seen in above examples) all of these params will be joined with AND operator meaning all of them must match. But that's not always the case so that's why QueryParamBuilder has been introduced so users can build up their on builders can provide them at the time the query is issued.
QueryParamBuilder is simple interface that is invoked as long as its build method returns non null value before query is performed. So you can build up an complex filter options that could not be simply expressed by list of QueryParams.

Here is basic implementation of QueryParamBuilder to give you a bit of jump start to implement your own - note that it relies on DashBuilder Dataset API.

public class TestQueryParamBuilder implements QueryParamBuilder<ColumnFilter> {

    private Map<String, Object> parameters;
    private boolean built = false;
    public TestQueryParamBuilder(Map<String, Object> parameters) {
        this.parameters = parameters;
    }
    
    @Override
    public ColumnFilter build() {
        // return null if it was already invoked
        if (built) {
            return null;
        }
        
        String columnName = "processInstanceId";
        
        ColumnFilter filter = FilterFactory.OR(
                FilterFactory.greaterOrEqualsTo((Long)parameters.get("min")),
                FilterFactory.lowerOrEqualsTo((Long)parameters.get("max")));
        filter.setColumnId(columnName);
       
        built = true;
        return filter;
    }

}


This concludes introduction to new QueryService based on Dashbuilder Dataset API to allow tailored queries against all possible data including (but not being limited to) jBPM data.

This article focused on jbpm services api but this functionality is also available in KIE Server for remote use cases. Stay tuned for another article describing remote capabilities.


2015/12/03

KIE Server: Extend KIE Server client with new capabilities

Last but not least part of KIE Server extensions is about extending KIE Server Client with additional capabilities.

Use case

On top of what was built in second article (adding Mina transport to KIE Server), we need to add KIE Server Client extension that allow to use Mina transport with unified KIE Server Client API.

Before you start create empty maven project (packaging jar) with following dependencies:

<properties>
    <version.org.kie>6.4.0-SNAPSHOT</version.org.kie>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.kie.server</groupId>
      <artifactId>kie-server-api</artifactId>
      <version>${version.org.kie}</version>
    </dependency>

    <dependency>
      <groupId>org.kie.server</groupId>
      <artifactId>kie-server-client</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    
    <dependency>
      <groupId>org.drools</groupId>
      <artifactId>drools-compiler</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
  </dependencies>

Design ServicesClient API interface

First thing we need to do is to decide what API we should have to be exposed to the callers of our Client API. Since the Mina extension is an extension on top of Drools one so let's provide same capabilities as RulesServicesClient:

public interface RulesMinaServicesClient extends RuleServicesClient {

}

As you can notice it simply extends the default RulesServiceClient interface and thus provide same capabilities. 

Why we need to have additional interface for it? It's because we are going to register client implementations based on their interface and there can be only one implementation for given interface.

Implement RulesMinaServicesClient

Next step is to actually implement the client and here we are going to prepare a socket based communication for simplicity sake. We could use Apache Mina client API though this would introduce additional dependency which we don't need for sample implementation.

Note that this client implementation is very simple and in many cases can be improved, but the point here is to show how it can be implemented rather than provide bullet proof code.

So few aspects to remember when reviewing the implementation:
  • it relies on default configuration from KIE Server client and thus uses serverUrl as place where to provide host and port of Mina server
  • hardcodes JSON as marshaling format
  • decision if the response is success or failure is based on checking if the received message is a JSON object (start with {) - very simple though works for simple cases
  • uses direct socket communication with blocking api while waiting for first line of the response and then reads up all lines that are available
  • does not use "stream mode" meaning it disconnects from the server after invoking command
Here is the implementation
public class RulesMinaServicesClientImpl implements RulesMinaServicesClient {
    
    private String host;
    private Integer port;
    
    private Marshaller marshaller;
    
    public RulesMinaServicesClientImpl(KieServicesConfiguration configuration, ClassLoader classloader) {
        String[] serverDetails = configuration.getServerUrl().split(":");
        
        this.host = serverDetails[0];
        this.port = Integer.parseInt(serverDetails[1]);
        
        this.marshaller = MarshallerFactory.getMarshaller(configuration.getExtraJaxbClasses(), MarshallingFormat.JSON, classloader);
    }

    public ServiceResponse<String> executeCommands(String id, String payload) {
        
        try {
            String response = sendReceive(id, payload);
            if (response.startsWith("{")) {
                return new ServiceResponse<String>(ResponseType.SUCCESS, null, response);
            } else {
                return new ServiceResponse<String>(ResponseType.FAILURE, response);
            }
        } catch (Exception e) {
            throw new KieServicesException("Unable to send request to KIE Server", e);
        }
    }

    public ServiceResponse<String> executeCommands(String id, Command<?> cmd) {
        try {
            String response = sendReceive(id, marshaller.marshall(cmd));
            if (response.startsWith("{")) {
                return new ServiceResponse<String>(ResponseType.SUCCESS, null, response);
            } else {
                return new ServiceResponse<String>(ResponseType.FAILURE, response);
            }
        } catch (Exception e) {
            throw new KieServicesException("Unable to send request to KIE Server", e);
        }
    }

    protected String sendReceive(String containerId, String content) throws Exception {
        
        // content - flatten the content to be single line
        content = content.replaceAll("\\n", "");
        
        Socket minaSocket = null;
        PrintWriter out = null;
        BufferedReader in = null;

        StringBuffer data = new StringBuffer();
        try {
            minaSocket = new Socket(host, port);
            out = new PrintWriter(minaSocket.getOutputStream(), true);
            in = new BufferedReader(new InputStreamReader(minaSocket.getInputStream()));
        
            // prepare and send data
            out.println(containerId + "|" + content);
            // wait for the first line
            data.append(in.readLine());
            // and then continue as long as it's available
            while (in.ready()) {
                data.append(in.readLine());
            }
            
            return data.toString();
        } finally {
            out.close();
            in.close();
            minaSocket.close();
        }
    }
}

Once we have the client interface and client implementation we need to make it available for KIE Service client to find it.

Implement KieServicesClientBuilder

org.kie.server.client.helper.KieServicesClientBuilder is the glue interface that allows to provide additional client apis to generic KIE Server Client infrastructure. This interface have two methods:
  • getImplementedCapability - which must much the server capability (extension) is going to use
  • build - which is responsible for providing map of client implementations where key is the interface and value fully initialized implementation
Here is a simple implementation of the client builder for this use case

public class MinaClientBuilderImpl implements KieServicesClientBuilder {

    public String getImplementedCapability() {
        return "BRM-Mina";
    }

    public Map<Class<?>, Object> build(KieServicesConfiguration configuration, ClassLoader classLoader) {
        Map<Class<?>, Object> services = new HashMap<Class<?>, Object>();

        services.put(RulesMinaServicesClient.class, new RulesMinaServicesClientImpl(configuration, classLoader));

        return services;
    }

}

Make it discoverable

Same story as for other extensions ... once we have all that needs to be implemented, it's time to make it discoverable so KIE Server Client can find and register this extension on runtime. Since KIE Server  Client is based on Java SE ServiceLoader mechanism we need to add one file into our extension jar file:

META-INF/services/org.kie.server.client.helper.KieServicesClientBuilder

And the content of this file is a single line that represents fully qualified class name of our custom implementation of  KieServicesClientBuilder.


How to use it

The usage scenario does not much differ from regular KIE Server Client use case:
  • create client configuration
  • create client instance
  • get service client by type
  • invoke client methods
Here is implementation that create KIE Server Client for RulesMinaServiceClient

protected RulesMinaServicesClient buildClient() {
    KieServicesConfiguration configuration = KieServicesFactory.newRestConfiguration("localhost:9123", null, null);
    List<String> capabilities = new ArrayList<String>();
    // we need to add explicitly capabilities as the mina client does not respond to get server info requests.
    capabilities.add("BRM-Mina");
    
    configuration.setCapabilities(capabilities);
    configuration.setMarshallingFormat(MarshallingFormat.JSON);
    
    configuration.addJaxbClasses(extraClasses);
    
    KieServicesClient kieServicesClient =  KieServicesFactory.newKieServicesClient(configuration);
    
    RulesMinaServicesClient rulesClient = kieServicesClient.getServicesClient(RulesMinaServicesClient.class);
    
    return rulesClient;
}
And here is how it is used to invoke operations on KIE Server via Mina transport

RulesMinaServicesClient rulesClient = buildClient();

List<Command<?>> commands = new ArrayList<Command<?>>();
BatchExecutionCommand executionCommand = commandsFactory.newBatchExecution(commands, "defaultKieSession");

Person person = new Person();
person.setName("mary");
commands.add(commandsFactory.newInsert(person, "person"));
commands.add(commandsFactory.newFireAllRules("fired"));

ServiceResponse<String> response = rulesClient.executeCommands(containerId, executionCommand);
Assert.assertNotNull(response);

Assert.assertEquals(ResponseType.SUCCESS, response.getType());

String data = response.getResult();

Marshaller marshaller = MarshallerFactory.getMarshaller(extraClasses, MarshallingFormat.JSON, this.getClass().getClassLoader());

ExecutionResultImpl results = marshaller.unmarshall(data, ExecutionResultImpl.class);
Assert.assertNotNull(results);

Object personResult = results.getValue("person");
Assert.assertTrue(personResult instanceof Person);

Assert.assertEquals("mary", ((Person) personResult).getName());
Assert.assertEquals("JBoss Community", ((Person) personResult).getAddress());
Assert.assertEquals(true, ((Person) personResult).isRegistered());

Complete code of this client extension can be found here.

And that's the last extension method to provide more features in KIE Server then given out of the box.

Thanks for reading the entire series of KIE Server extensions and any and all feedback welcome :)

KIE Server: Extend KIE Server with additional transport

There might be some cases where existing transports in KIE Server won't be sufficient, for whatever reason

  • not fast enough
  • difficult to deal with string based data formats (JSON, XML)
  • you name it..so there might be a need to build a custom transport to overcome this limitation.

Use case

Add additional transport to KIE Server that allows to use Drools capabilities. For this example we will use Apache Mina as underlying transport framework and we're going to exchange string based data that will still rely on existing marshaling operations. For simplicity reason we support only JSON format.

Before you start create empty maven project (packaging jar) with following dependencies:

<properties>
    <version.org.kie>6.4.0-SNAPSHOT</version.org.kie>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.kie</groupId>
      <artifactId>kie-api</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    <dependency>
      <groupId>org.kie</groupId>
      <artifactId>kie-internal</artifactId>
      <version>${version.org.kie}</version>
    </dependency>

    <dependency>
      <groupId>org.kie.server</groupId>
      <artifactId>kie-server-api</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    <dependency>
      <groupId>org.kie.server</groupId>
      <artifactId>kie-server-services-common</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    <dependency>
      <groupId>org.kie.server</groupId>
      <artifactId>kie-server-services-drools</artifactId>
      <version>${version.org.kie}</version>
    </dependency>

    <dependency>
      <groupId>org.drools</groupId>
      <artifactId>drools-core</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    <dependency>
      <groupId>org.drools</groupId>
      <artifactId>drools-compiler</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.mina</groupId>
      <artifactId>mina-core</artifactId>
      <version>2.0.9</version>
    </dependency>

  </dependencies>

Implement KieServerExtension

Main part of this implementation is done by implementing org.kie.server.services.api.KieServerExtension which is KIE Server extension main interface. This interface has number of methods which implementation depends on the actual needs:

In our case we don't need to do anything when container is created or disposed as we simply extend the Drools extension and rely on complete setup in that component. For this example we are mostly interested in implementing:
  • init method
  • destroy method
in these two methods we are going to manage life cycle of the Apache Mina server. 
public interface KieServerExtension {

    boolean isActive();

    void init(KieServerImpl kieServer, KieServerRegistry registry);

    void destroy(KieServerImpl kieServer, KieServerRegistry registry);

    void createContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters);

    void disposeContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters);

    List<Object> getAppComponents(SupportedTransports type);

    <T> T getAppComponents(Class<T> serviceType);

    String getImplementedCapability();

    List<Object> getServices();

    String getExtensionName();

    Integer getStartOrder();
}

Next there are few methods that describe the extension:
  • getImplementedCapability - should instruct what kind of capability is covered by this extension, note that capability should be unique within KIE Server
  • getExtensionName - human readable name of this extension
  • getStartOrder - defined when given extension should be started, important for extensions that have dependencies to other extensions like in this case where it depends on Drools (startup order is set to 0) so our extension should start after drools one - thus set to 20
Remaining methods are left with standard implementation to fulfill interface requirements.

Here is the implementation of the KIE Server extension based on Apache Mina:

public class MinaDroolsKieServerExtension implements KieServerExtension {

    private static final Logger logger = LoggerFactory.getLogger(MinaDroolsKieServerExtension.class);

    public static final String EXTENSION_NAME = "Drools-Mina";

    private static final Boolean disabled = Boolean.parseBoolean(System.getProperty("org.kie.server.drools-mina.ext.disabled", "false"));
    private static final String MINA_HOST = System.getProperty("org.kie.server.drools-mina.ext.port", "localhost");
    private static final int MINA_PORT = Integer.parseInt(System.getProperty("org.kie.server.drools-mina.ext.port", "9123"));
    
    // taken from dependency - Drools extension
    private KieContainerCommandService batchCommandService;
    
    // mina specific 
    private IoAcceptor acceptor;
    
    public boolean isActive() {
        return disabled == false;
    }

    public void init(KieServerImpl kieServer, KieServerRegistry registry) {
        
        KieServerExtension droolsExtension = registry.getServerExtension("Drools");
        if (droolsExtension == null) {
            logger.warn("No Drools extension available, quiting...");
            return;
        }
        
        List<Object> droolsServices = droolsExtension.getServices();
        for( Object object : droolsServices ) {
            // in case given service is null (meaning was not configured) continue with next one
            if (object == null) {
                continue;
            }
            if( KieContainerCommandService.class.isAssignableFrom(object.getClass()) ) {
                batchCommandService = (KieContainerCommandService) object;
                continue;
            } 
        }
        if (batchCommandService != null) {
            acceptor = new NioSocketAcceptor();
            acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
    
            acceptor.setHandler( new TextBasedIoHandlerAdapter(batchCommandService) );
            acceptor.getSessionConfig().setReadBufferSize( 2048 );
            acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
            try {
                acceptor.bind( new InetSocketAddress(MINA_HOST, MINA_PORT) );
                
                logger.info("{} -- Mina server started at {} and port {}", toString(), MINA_HOST, MINA_PORT);
            } catch (IOException e) {
                logger.error("Unable to start Mina acceptor due to {}", e.getMessage(), e);
            }
    
        }
    }

    public void destroy(KieServerImpl kieServer, KieServerRegistry registry) {
        if (acceptor != null) {
            acceptor.dispose();
            acceptor = null;
        }
        logger.info("{} -- Mina server stopped", toString());
    }

    public void createContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) {
        // no op - it's already handled by Drools extension

    }

    public void disposeContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) {
        // no op - it's already handled by Drools extension

    }

    public List<Object> getAppComponents(SupportedTransports type) {
        // nothing for supported transports (REST or JMS)
        return Collections.emptyList();
    }

    public <T> T getAppComponents(Class<T> serviceType) {

        return null;
    }

    public String getImplementedCapability() {
        return "BRM-Mina";
    }

    public List<Object> getServices() {
        return Collections.emptyList();
    }

    public String getExtensionName() { 
        return EXTENSION_NAME;
    }

    public Integer getStartOrder() {
        return 20;
    }

    @Override
    public String toString() {
        return EXTENSION_NAME + " KIE Server extension";
    }
}
As can be noticed main part of implementation is in the init method that is responsible for collecting services from Drools extensions and bootstrapping Apache Mina server.
Worth noticing is the TextBaseIOHandlerAdapter class that is used as handler on Mina server that in essence will react to incoming requests.

Implement Apache Mina handler

Here is the implementation of the handler class that receives text message and executes it on drools service. 

public class TextBasedIoHandlerAdapter extends IoHandlerAdapter {
    
    private static final Logger logger = LoggerFactory.getLogger(TextBasedIoHandlerAdapter.class);

    private KieContainerCommandService batchCommandService;
    
    public TextBasedIoHandlerAdapter(KieContainerCommandService batchCommandService) {
        this.batchCommandService = batchCommandService;
    }

    @Override
    public void messageReceived( IoSession session, Object message ) throws Exception {
        String completeMessage = message.toString();
        logger.debug("Received message '{}'", completeMessage);
        if( completeMessage.trim().equalsIgnoreCase("quit") || completeMessage.trim().equalsIgnoreCase("exit") ) {
            session.close(false);
            return;
        }

        String[] elements = completeMessage.split("\\|");
        logger.debug("Container id {}", elements[0]);
        try {
            ServiceResponse<String> result = batchCommandService.callContainer(elements[0], elements[1], MarshallingFormat.JSON, null);
            
            if (result.getType().equals(ServiceResponse.ResponseType.SUCCESS)) {
                session.write(result.getResult());
                logger.debug("Successful message written with content '{}'", result.getResult());
            } else {
                session.write(result.getMsg());
                logger.debug("Failure message written with content '{}'", result.getMsg()); 
            }
        } catch (Exception e) {
            
        }
    }
}

Few details about the handler implementation:
  • each incoming request is single line, so make sure before submitting anything to it make sure it's single line
  • there is a need to pass container id in this single line so this handler expects following format:
    • containerID|payload
  • response is set the way it is produced by marshaller and that can be multiple lines
  • handlers allows "stream mode" that allows to send commands without disconnecting from KIE Server session. to be able to quit the stream mode - send either exit or quit

Make it discoverable

Same story as for REST extension ... once we have all that needs to be implemented, it's time to make it discoverable so KIE Server can find and register this extension on runtime. Since KIE Server is based on Java SE ServiceLoader mechanism we need to add one file into our extension jar file:

META-INF/services/org.kie.server.services.api.KieServerExtension

And the content of this file is a single line that represents fully qualified class name of our custom implementation of  KieServerExtension.


Last step is to build this project (which will result in jar file) and copy the result into:
 kie-server.war/WEB-INF/lib

Since this extension depends on Apache Mina we need to copy mina-core-2.0.9.jar into  kie-server.war/WEB-INF/lib as well.

Usage example

Clone this repository and build the kie-server-demo project. Once you build it you will be able to deploy it to KIE Server (either directly using KIE Server management REST api) or via KIE workbench controller.

Once deployed and KIE Server started you should find in logs that new KIE Server extension started:
Drools-Mina KIE Server extension -- Mina server started at localhost and port 9123
Drools-Mina KIE Server extension has been successfully registered as server extension

That means we are now interact with our Apache Mina based transport in KIE Server. So let's give it a go... we could write a code to interact with Mina server but to avoid another coding exercise let's use... wait for it .... telnet :)

Start telnet and connect to KIE Server on port 9123:
telnet 127.0.0.1 9123

once connected you can easily interact with alive and kicking KIE Server:
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
demo|{"lookup":"defaultKieSession","commands":[{"insert":{"object":{"org.jbpm.test.Person":{"name":"john","age":25}}}},{"fire-all-rules":""}]}
{
  "results" : [ {
    "key" : "",
    "value" : 1
  } ],
  "facts" : [ ]
}
demo|{"lookup":"defaultKieSession","commands":[{"insert":{"object":{"org.jbpm.test.Person":{"name":"john","age":25}}}},{"fire-all-rules":""}]}
{
  "results" : [ {
    "key" : "",
    "value" : 1
  } ],
  "facts" : [ ]
}
demo|{"lookup":"defaultKieSession","commands":[{"insert":{"object":{"org.jbpm.test.Person":{"name":"maciek","age":25}}}},{"fire-all-rules":""}]}
{
  "results" : [ {
    "key" : "",
    "value" : 1
  } ],
  "facts" : [ ]
}
exit
Connection closed by foreign host.

where:

  • green is request message
  • blue is response
  • orange is exit message


in the server side logs you will see something like this:
16:33:40,206 INFO  [stdout] (NioProcessor-2) Hello john
16:34:03,877 INFO  [stdout] (NioProcessor-2) Hello john
16:34:19,800 INFO  [stdout] (NioProcessor-2) Hello maciek

This illustrated the stream mode where we simply type in commands after command without disconnecting from the KIE Server.

This concludes this exercise and complete code for this can be found here.

KIE Server: Extend existing server capability with extra REST endpoint

First and most likely the most frequently required extension to KIE Server is to extend REST api of already available extension - Drools or jBPM. There are few simple steps that needs to be done to provide extra endpoints in KIE Server.

Our use case

We are going to extend Drools extension with additional endpoint that will do very simple thing - expose single endpoint that will accept list of facts to be inserted and automatically call fire all rules and retrieve all objects from ksession.
Endpoint will be bound to following path:
server/containers/instances/{id}/ksession/{ksessionId}

where:
  • id is container identifier
  • ksessionId is name of the ksession within container to be used

Before you start create empty maven project (packaging jar) with following dependencies:

 
 <properties>
    <version.org.kie>6.4.0-SNAPSHOT</version.org.kie>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.kie</groupId>
      <artifactId>kie-api</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    <dependency>
      <groupId>org.kie</groupId>
      <artifactId>kie-internal</artifactId>
      <version>${version.org.kie}</version>
    </dependency>

    <dependency>
      <groupId>org.kie.server</groupId>
      <artifactId>kie-server-api</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    <dependency>
      <groupId>org.kie.server</groupId>
      <artifactId>kie-server-services-common</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    <dependency>
      <groupId>org.kie.server</groupId>
      <artifactId>kie-server-services-drools</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    
    <dependency>
      <groupId>org.kie.server</groupId>
      <artifactId>kie-server-rest-common</artifactId>
      <version>${version.org.kie}</version>
    </dependency>

    <dependency>
      <groupId>org.drools</groupId>
      <artifactId>drools-core</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    <dependency>
      <groupId>org.drools</groupId>
      <artifactId>drools-compiler</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.2</version>
    </dependency>

  </dependencies>

Implement KieServerApplicationComponentsService

First step is to implement org.kie.server.services.api.KieServerApplicationComponentsService that is responsible for delivering REST endpoints (aka resources) to the KIE Server infrastructure that will be then deployed on application start. This interface is very simple and has only one method:

Collection<Object> getAppComponents(String extension, 
                                    SupportedTransports type, Object... services)

this method is then invoked by KIE Server when booting up and should return all resources that REST container should deploy.

This method implementation should take into consideration following:

  • it is called by all extensions and thus it provides extension name so custom implementations can decide if this extension is for it or not
  • supported type - either REST or JMS - in our case it will be REST only
  • services - dedicated services to given extensions that can be then used as part of custom extension - usually these are engine services
Here is a sample implementation that uses Drools extension as base (and by that its services)

 
public class CusomtDroolsKieServerApplicationComponentsService implements KieServerApplicationComponentsService {

    private static final String OWNER_EXTENSION = "Drools";
    
    public Collection<Object> getAppComponents(String extension, SupportedTransports type, Object... services) {
        // skip calls from other than owning extension
        if ( !OWNER_EXTENSION.equals(extension) ) {
            return Collections.emptyList();
        }
        
        RulesExecutionService rulesExecutionService = null;
        KieServerRegistry context = null;
       
        for( Object object : services ) { 
            if( RulesExecutionService.class.isAssignableFrom(object.getClass()) ) { 
                rulesExecutionService = (RulesExecutionService) object;
                continue;
            } else if( KieServerRegistry.class.isAssignableFrom(object.getClass()) ) {
                context = (KieServerRegistry) object;
                continue;
            }
        }
        
        List<Object> components = new ArrayList<Object>(1);
        if( SupportedTransports.REST.equals(type) ) {
            components.add(new CustomResource(rulesExecutionService, context));
        }
        
        return components;
    }

}


So what can be seen here is that it only reacts to Drools extension services and others are ignored. Next it will select RulesExecutionService and KieServerRegistry from available services. Last will create new CustomResource (implemented in next step) and returns it as part of the components list.

Implement REST resource

Next step is to implement custom REST resource that will be used by KIE Server to provide additional functionality. Here we do a simple, single method resource that:
  • uses POST http method
  • expects following data to be given:
    • container id as path argument
    • ksession id as path argument
    • list of facts as message payload 
  • supports all KIE Server data formats:
    • XML - JAXB
    • JSON
    • XML - Xstream
It will then unmarshal the payload into actual List<?> and create for each item in the list new InsertCommand. These inserts will be then followed by FireAllRules and GetObject commands. All will be then added as commands of BatchExecutionCommand and used to call rule engine. As simple as that. It is already available on KIE Server out of the box but requires complete setup of BatchExecutionCommand to be done on client side. Not that it's not possible but this extension is tailored one for simple pattern :
insert -> evaluate -> return

Here is how the simple implementation could look like:
 
@Path("server/containers/instances/{id}/ksession")
public class CustomResource {

    private static final Logger logger = LoggerFactory.getLogger(CustomResource.class);
    
    private KieCommands commandsFactory = KieServices.Factory.get().getCommands();

    private RulesExecutionService rulesExecutionService;
    private KieServerRegistry registry;

    public CustomResource() {

    }

    public CustomResource(RulesExecutionService rulesExecutionService, KieServerRegistry registry) {
        this.rulesExecutionService = rulesExecutionService;
        this.registry = registry;
    }
    
    @POST
    @Path("/{ksessionId}")
    @Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
    public Response insertFireReturn(@Context HttpHeaders headers, 
            @PathParam("id") String id, 
            @PathParam("ksessionId") String ksessionId, 
            String cmdPayload) {

        Variant v = getVariant(headers);
        String contentType = getContentType(headers);
        
        MarshallingFormat format = MarshallingFormat.fromType(contentType);
        if (format == null) {
            format = MarshallingFormat.valueOf(contentType);
        }
        try {    
            KieContainerInstance kci = registry.getContainer(id);
            
            Marshaller marshaller = kci.getMarshaller(format);
            
            List<?> listOfFacts = marshaller.unmarshall(cmdPayload, List.class);
            
            List<Command<?>> commands = new ArrayList<Command<?>>();
            BatchExecutionCommand executionCommand = commandsFactory.newBatchExecution(commands, ksessionId);
            
            for (Object fact : listOfFacts) {
                commands.add(commandsFactory.newInsert(fact, fact.toString()));
            }
            commands.add(commandsFactory.newFireAllRules());
            commands.add(commandsFactory.newGetObjects());
                
            ExecutionResults results = rulesExecutionService.call(kci, executionCommand);
                    
            String result = marshaller.marshall(results);
            
            
            logger.debug("Returning OK response with content '{}'", result);
            return createResponse(result, v, Response.Status.OK);
        } catch (Exception e) {
            // in case marshalling failed return the call container response to keep backward compatibility
            String response = "Execution failed with error : " + e.getMessage();
            logger.debug("Returning Failure response with content '{}'", response);
            return createResponse(response, v, Response.Status.INTERNAL_SERVER_ERROR);
        }

    }
}


Make it discoverable

Once we have all that needs to be implemented, it's time to make it discoverable so KIE Server can find and register this extension on runtime. Since KIE Server is based on Java SE ServiceLoader mechanism we need to add one file into our extension jar file:

META-INF/services/org.kie.server.services.api.KieServerApplicationComponentsService

And the content of this file is a single line that represents fully qualified class name of our custom implementation of  KieServerApplicationComponentsService.


Last step is to build this project (which will result in jar file) and copy the result into:
 kie-server.war/WEB-INF/lib

And that's all that is needed. Start KIE Server and then you can start interacting with your new REST endpoint that relies on Drools extension.

Usage example

Clone this repository and build the kie-server-demo project. Once you build it you will be able to deploy it to KIE Server (either directly using KIE Server management REST api) or via KIE workbench controller.

Once deployed you can use following to invoke new endpoint:
URL: 
http://localhost:8080/kie-server/services/rest/server/containers/instances/demo/ksession/defaultKieSession

HTTP Method: POST
Headers:
Content-Type: application/json
Accept: application/json

Message payload:
[
{
  "org.jbpm.test.Person":{
     "name":"john",
     "age":25}
   },
  {
    "org.jbpm.test.Person":{
       "name":"mary",
       "age":22}
   }
]

A simple list with two items representing people, execute it and you should see following in server log:
13:37:20,347 INFO  [stdout] (default task-24) Hello mary
13:37:20,348 INFO  [stdout] (default task-24) Hello john

And the response should contain objects retrieved after rule evaluation where each Person object has:
  • address set to 'JBoss Community'
  • registered flag set to true

With this sample use case we illustrated how easy it is to extend REST api of KIE Server. Complete code for this extension can be found here.

Extending KIE Server capabilities

As a follow up of previous articles about  KIE Server, I'd like to present the extensibility support provided by KIE Server. Let's quickly look at KIE Server architecture...

Extensions overview

KIE Server is built around extensions, every piece of functionality is actually provided by extension. Out of the box we have following:

  • KIE Server extension - this is the default extension that provides management capabilities of the KIE Server - like creating or disposing containers etc
  • Drools extension - this extension provides rules (BRMS) capabilities, e.g. allows to inserting facts and firing rules (among others)
  • jBPM extension - this extensions provides process (BPMS) capabilities e.g. business process execution, user tasks, async jobs
  • jBPM UI extension - added in 6.4 additional extension that depends on jBPM extension and provides UI related capabilities - forms and process images
With just these out of the box capabilities KIE Server provides quite a bit of coverage. But that not all... extensions provide the capabilities but these capabilities must be somehow exposed to the users. And here KIE Server comes with two transports by default:
  • REST
  • JMS
Due to a need to be able to effectively manage extensions in runtime these are packaged in different jar files. So looking at the out of the box extensions we have following modules:
  • Drools extension
    • kie-server-services-drools
    • kie-server-rest-drools
  • jBPM extension
    • kie-server-services-jbpm
    • kie-server-rest-jbpm
  • jBPM UI extension
    • kie-server-services-jbpm-ui
    • kie-server-rest-jbpm-ui
All above modules are automatically discovered on runtime and registered in KIE Server if the are enabled (which by default they are). Extensions can be disabled using system properties
  • Drools extension
    • org.drools.server.ext.disabled = true
  • jBPM extension
    • org.jbpm.server.ext.disabled = true
  • jBPM UI extension
    • org.jbpm.ui.server.ext.disabled = true
But this is not all... if someone does not like the client api a client api can also be extended by implementing custom interfaces. This is why there is an extra step needed to get remote client:

kieServerClient.getServicesClient(Interface.class)

Why extensions are needed?

Let's not look at why would someone consider extending KIE server?

  • First and foremost is there might be missing functionality which is not yet implemented in KIE Server but exists in engines (process or rule engine). 
    • REST extension
  • Another use case is that something should be done differently than it is done out of the box - different parameters and so on..
    • client extension
  • Last but not least, it should be possible to extend the transport coverage meaning allow users to add other transports next to REST and JMS.
    • server extension
With this users can first of all, cover their requirements even if the out of the box KIE Server implementation does not provide required functionality. Next such extensions cane contributed to be included in the project or can be shipped as custom extensions available for other users.

This benefits both project and users so I'd like to encourage every one to look into details and think if there is anything missing and if so try to solve it by building extensions.

Let's extend KIE Server capabilities

Following three articles will provide details on how to build KIE Server extensions:

Important note: While most of the work could be achieved already with 6.3.0.Final I'd strongly recommend to give it a go with 6.4.0 (and thus all dependencies refer to 6.4.0-SNAPSHOT) as the extensions have been simplified a lot.