2015/12/03

KIE Server: Extend KIE Server with additional transport

There might be some cases where existing transports in KIE Server won't be sufficient, for whatever reason

  • not fast enough
  • difficult to deal with string based data formats (JSON, XML)
  • you name it..so there might be a need to build a custom transport to overcome this limitation.

Use case

Add additional transport to KIE Server that allows to use Drools capabilities. For this example we will use Apache Mina as underlying transport framework and we're going to exchange string based data that will still rely on existing marshaling operations. For simplicity reason we support only JSON format.

Before you start create empty maven project (packaging jar) with following dependencies:

<properties>
    <version.org.kie>6.4.0-SNAPSHOT</version.org.kie>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.kie</groupId>
      <artifactId>kie-api</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    <dependency>
      <groupId>org.kie</groupId>
      <artifactId>kie-internal</artifactId>
      <version>${version.org.kie}</version>
    </dependency>

    <dependency>
      <groupId>org.kie.server</groupId>
      <artifactId>kie-server-api</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    <dependency>
      <groupId>org.kie.server</groupId>
      <artifactId>kie-server-services-common</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    <dependency>
      <groupId>org.kie.server</groupId>
      <artifactId>kie-server-services-drools</artifactId>
      <version>${version.org.kie}</version>
    </dependency>

    <dependency>
      <groupId>org.drools</groupId>
      <artifactId>drools-core</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    <dependency>
      <groupId>org.drools</groupId>
      <artifactId>drools-compiler</artifactId>
      <version>${version.org.kie}</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.mina</groupId>
      <artifactId>mina-core</artifactId>
      <version>2.0.9</version>
    </dependency>

  </dependencies>

Implement KieServerExtension

Main part of this implementation is done by implementing org.kie.server.services.api.KieServerExtension which is KIE Server extension main interface. This interface has number of methods which implementation depends on the actual needs:

In our case we don't need to do anything when container is created or disposed as we simply extend the Drools extension and rely on complete setup in that component. For this example we are mostly interested in implementing:
  • init method
  • destroy method
in these two methods we are going to manage life cycle of the Apache Mina server. 
public interface KieServerExtension {

    boolean isActive();

    void init(KieServerImpl kieServer, KieServerRegistry registry);

    void destroy(KieServerImpl kieServer, KieServerRegistry registry);

    void createContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters);

    void disposeContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters);

    List<Object> getAppComponents(SupportedTransports type);

    <T> T getAppComponents(Class<T> serviceType);

    String getImplementedCapability();

    List<Object> getServices();

    String getExtensionName();

    Integer getStartOrder();
}

Next there are few methods that describe the extension:
  • getImplementedCapability - should instruct what kind of capability is covered by this extension, note that capability should be unique within KIE Server
  • getExtensionName - human readable name of this extension
  • getStartOrder - defined when given extension should be started, important for extensions that have dependencies to other extensions like in this case where it depends on Drools (startup order is set to 0) so our extension should start after drools one - thus set to 20
Remaining methods are left with standard implementation to fulfill interface requirements.

Here is the implementation of the KIE Server extension based on Apache Mina:

public class MinaDroolsKieServerExtension implements KieServerExtension {

    private static final Logger logger = LoggerFactory.getLogger(MinaDroolsKieServerExtension.class);

    public static final String EXTENSION_NAME = "Drools-Mina";

    private static final Boolean disabled = Boolean.parseBoolean(System.getProperty("org.kie.server.drools-mina.ext.disabled", "false"));
    private static final String MINA_HOST = System.getProperty("org.kie.server.drools-mina.ext.port", "localhost");
    private static final int MINA_PORT = Integer.parseInt(System.getProperty("org.kie.server.drools-mina.ext.port", "9123"));
    
    // taken from dependency - Drools extension
    private KieContainerCommandService batchCommandService;
    
    // mina specific 
    private IoAcceptor acceptor;
    
    public boolean isActive() {
        return disabled == false;
    }

    public void init(KieServerImpl kieServer, KieServerRegistry registry) {
        
        KieServerExtension droolsExtension = registry.getServerExtension("Drools");
        if (droolsExtension == null) {
            logger.warn("No Drools extension available, quiting...");
            return;
        }
        
        List<Object> droolsServices = droolsExtension.getServices();
        for( Object object : droolsServices ) {
            // in case given service is null (meaning was not configured) continue with next one
            if (object == null) {
                continue;
            }
            if( KieContainerCommandService.class.isAssignableFrom(object.getClass()) ) {
                batchCommandService = (KieContainerCommandService) object;
                continue;
            } 
        }
        if (batchCommandService != null) {
            acceptor = new NioSocketAcceptor();
            acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
    
            acceptor.setHandler( new TextBasedIoHandlerAdapter(batchCommandService) );
            acceptor.getSessionConfig().setReadBufferSize( 2048 );
            acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
            try {
                acceptor.bind( new InetSocketAddress(MINA_HOST, MINA_PORT) );
                
                logger.info("{} -- Mina server started at {} and port {}", toString(), MINA_HOST, MINA_PORT);
            } catch (IOException e) {
                logger.error("Unable to start Mina acceptor due to {}", e.getMessage(), e);
            }
    
        }
    }

    public void destroy(KieServerImpl kieServer, KieServerRegistry registry) {
        if (acceptor != null) {
            acceptor.dispose();
            acceptor = null;
        }
        logger.info("{} -- Mina server stopped", toString());
    }

    public void createContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) {
        // no op - it's already handled by Drools extension

    }

    public void disposeContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) {
        // no op - it's already handled by Drools extension

    }

    public List<Object> getAppComponents(SupportedTransports type) {
        // nothing for supported transports (REST or JMS)
        return Collections.emptyList();
    }

    public <T> T getAppComponents(Class<T> serviceType) {

        return null;
    }

    public String getImplementedCapability() {
        return "BRM-Mina";
    }

    public List<Object> getServices() {
        return Collections.emptyList();
    }

    public String getExtensionName() { 
        return EXTENSION_NAME;
    }

    public Integer getStartOrder() {
        return 20;
    }

    @Override
    public String toString() {
        return EXTENSION_NAME + " KIE Server extension";
    }
}
As can be noticed main part of implementation is in the init method that is responsible for collecting services from Drools extensions and bootstrapping Apache Mina server.
Worth noticing is the TextBaseIOHandlerAdapter class that is used as handler on Mina server that in essence will react to incoming requests.

Implement Apache Mina handler

Here is the implementation of the handler class that receives text message and executes it on drools service. 

public class TextBasedIoHandlerAdapter extends IoHandlerAdapter {
    
    private static final Logger logger = LoggerFactory.getLogger(TextBasedIoHandlerAdapter.class);

    private KieContainerCommandService batchCommandService;
    
    public TextBasedIoHandlerAdapter(KieContainerCommandService batchCommandService) {
        this.batchCommandService = batchCommandService;
    }

    @Override
    public void messageReceived( IoSession session, Object message ) throws Exception {
        String completeMessage = message.toString();
        logger.debug("Received message '{}'", completeMessage);
        if( completeMessage.trim().equalsIgnoreCase("quit") || completeMessage.trim().equalsIgnoreCase("exit") ) {
            session.close(false);
            return;
        }

        String[] elements = completeMessage.split("\\|");
        logger.debug("Container id {}", elements[0]);
        try {
            ServiceResponse<String> result = batchCommandService.callContainer(elements[0], elements[1], MarshallingFormat.JSON, null);
            
            if (result.getType().equals(ServiceResponse.ResponseType.SUCCESS)) {
                session.write(result.getResult());
                logger.debug("Successful message written with content '{}'", result.getResult());
            } else {
                session.write(result.getMsg());
                logger.debug("Failure message written with content '{}'", result.getMsg()); 
            }
        } catch (Exception e) {
            
        }
    }
}

Few details about the handler implementation:
  • each incoming request is single line, so make sure before submitting anything to it make sure it's single line
  • there is a need to pass container id in this single line so this handler expects following format:
    • containerID|payload
  • response is set the way it is produced by marshaller and that can be multiple lines
  • handlers allows "stream mode" that allows to send commands without disconnecting from KIE Server session. to be able to quit the stream mode - send either exit or quit

Make it discoverable

Same story as for REST extension ... once we have all that needs to be implemented, it's time to make it discoverable so KIE Server can find and register this extension on runtime. Since KIE Server is based on Java SE ServiceLoader mechanism we need to add one file into our extension jar file:

META-INF/services/org.kie.server.services.api.KieServerExtension

And the content of this file is a single line that represents fully qualified class name of our custom implementation of  KieServerExtension.


Last step is to build this project (which will result in jar file) and copy the result into:
 kie-server.war/WEB-INF/lib

Since this extension depends on Apache Mina we need to copy mina-core-2.0.9.jar into  kie-server.war/WEB-INF/lib as well.

Usage example

Clone this repository and build the kie-server-demo project. Once you build it you will be able to deploy it to KIE Server (either directly using KIE Server management REST api) or via KIE workbench controller.

Once deployed and KIE Server started you should find in logs that new KIE Server extension started:
Drools-Mina KIE Server extension -- Mina server started at localhost and port 9123
Drools-Mina KIE Server extension has been successfully registered as server extension

That means we are now interact with our Apache Mina based transport in KIE Server. So let's give it a go... we could write a code to interact with Mina server but to avoid another coding exercise let's use... wait for it .... telnet :)

Start telnet and connect to KIE Server on port 9123:
telnet 127.0.0.1 9123

once connected you can easily interact with alive and kicking KIE Server:
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
demo|{"lookup":"defaultKieSession","commands":[{"insert":{"object":{"org.jbpm.test.Person":{"name":"john","age":25}}}},{"fire-all-rules":""}]}
{
  "results" : [ {
    "key" : "",
    "value" : 1
  } ],
  "facts" : [ ]
}
demo|{"lookup":"defaultKieSession","commands":[{"insert":{"object":{"org.jbpm.test.Person":{"name":"john","age":25}}}},{"fire-all-rules":""}]}
{
  "results" : [ {
    "key" : "",
    "value" : 1
  } ],
  "facts" : [ ]
}
demo|{"lookup":"defaultKieSession","commands":[{"insert":{"object":{"org.jbpm.test.Person":{"name":"maciek","age":25}}}},{"fire-all-rules":""}]}
{
  "results" : [ {
    "key" : "",
    "value" : 1
  } ],
  "facts" : [ ]
}
exit
Connection closed by foreign host.

where:

  • green is request message
  • blue is response
  • orange is exit message


in the server side logs you will see something like this:
16:33:40,206 INFO  [stdout] (NioProcessor-2) Hello john
16:34:03,877 INFO  [stdout] (NioProcessor-2) Hello john
16:34:19,800 INFO  [stdout] (NioProcessor-2) Hello maciek

This illustrated the stream mode where we simply type in commands after command without disconnecting from the KIE Server.

This concludes this exercise and complete code for this can be found here.