czwartek, 20 sierpnia 2015

Asynchronous processing with jBPM 6.3

As described in previous article, jBPM executor has been enhanced to provide more robust and powerful execution mechanism for asynchronous tasks. That is based on JMS. So let's take a look at the actual improvements by bringing this into the real process execution.

The use case is rather simple to understand but puts quite a load on the process engine and asynchronous execution capabilities.

  • main process that uses multi instance subprocess to create another process instance to carry additional processing and then awaits for for signal informing about completion of the child process
    • one version that uses Call Activity to start sub process
    • another that uses AsyncStartProcess command instead of Call Activity
  • sub process that has responsibility to execute a job in asynchronous fashion

Main process with call activity to start sub process


Main process with async start process task to start subprocess
Sub process that is invoked from the main process
So what we have here and what's the difference between two main process versions:

  • main process will create as many new process instances as given in collection that is an input to multi instance subprocess - that is driven by process variable that user needs to provide on main process start
  • then in one version to create new process instance as part of multi instance it will use Call Activity BPMN2 construct to create process - that is synchronous way
  • in the second version, on the other hand, multi instance will use Async Start Process command (via async task) to start process instance in asynchronous way
While these two achieve pretty much the same they do differ quite a lot. First of all, using Call Activity will result in following:
  • main process instance will not finish until all sub process instances are created - depending on number of them might be millisecond or seconds or even minutes (in case of really huge set of sub process instances)
  • creation of main process and sub process instances are done in single transaction - all or nothing so if one of the subprocess fails for whatever reason all will be rolled back including main process instance
  • it takes time to commit all data into data base after creating all process instances - note that each process instance (and session instance when using per process instance strategy) has to be serialized using protobuf and then send to db as byte array, and all other inserts as well (for process, tasks, history log etc). That all takes time and might exceed transaction timeout which will cause rollback again...
When using async start process command the situation is slightly different:
  • main process instance will wait only for creating job requests to start all subprocess instances, this is not really starting any process instance yet
  • rollback will affect only main process instance and job requests, meaning it is still consistent as unless main process is committed no sub process instances will be created
  • subprocess instances are started independently meaning a failure of one instance does not affect another, moreover since this is async job it will be retried and can actually be configured to retry with different delays
  • each sub process instance is carried within own transaction which is much smaller and finishes way faster (almost no risk to encounter transaction timeouts) and much less data to be send to data base - just one instance (and session in case of per process instance strategy)

That concludes the main use case here. Though there is one additional that in normal processing will cause issues - single parent process instance that must be notified by huge number of child process instances, and that can happen at pretty much same time. That will cause concurrent updates to same process instance which will result in optimistic lock exception (famous StaleObjectStateException). That is expected and process engine can cope with that to some extent - by using retry interceptor in case of optimistic lock exceptions. Although it might be too many concurrent updates that some of them will exceed the retry count and fail to notify the process instance. Besides that each such failure will cause errors to be printed to logs and by that can reduce visibility in logs and cause some alerts in production systems.

So how to deal with this?
Idea is to skip the regular notification mechanism that directly calls the parent process instance to avoid concurrent updates and instead use signal events (catch in main process instance and throw in subprocess instance).
Main process catch signal intermediate event
Sub process throw signal end event
But use of signal catch and throw events does not solve the problem by itself. The game changer is the scope of the throw event that allows to use so called 'External' scope that utilizes JMS messaging to deliver the signal from the child to parent process instance. Since main process instance uses multi instance subprocess to create child process instances there will be multiple (same number as sub process instances) catch signal events waiting for the notification.
With that signal name cannot be same like a constant as first signal from sub process instance would trigger all catch events and by that finish multi instance too early.

To support this case signal names must be dynamic - based on process variable. Let's enumerate of 
the steps these two processes will do when being executed:
  • main process: upon start will create given number of subprocess that will call new process instance (child process instance)
  • main process: upon requesting the sub process instance creation (regardless if it's via call activity or async task) it will pass signal name that is build based on some constant + unique (serialized-#{item}) items that represents single entry from multi instance input collection
  • main process: will then move on to intermediate catch signal event where name is again same as given to sub process (child) and await it (serialized-#{item})
  • sub process: after executing the process it will throw an event via end signal event with signal name given as input parameter when it was started (serialized-#{item}) and use external scope so it will be send via JMS in transactional way - delivered only when subprocess completes (and commits) successfully

External scope for throw signal events is backed by WorkItemHandler for plug-ability reasons so it can be realized in many ways, not only the default JMS way. Although JMS provides comprehensive messaging infrastructure that is configurable and cluster aware. To solve completely the problem - with concurrent updates to the parent process instance - we need to configure receiver of the signals accordingly. The configuration boils down to single property - activation specification property that limits number of sessions for given endpoint.
In JBoss EAP/Wildfly it can be given as simple entry on configuration of MDB defined in workbench/jbpm console:

In default installation the signal receiver MDB is not limiting concurrent processing and looks like this (WEB-INF/ejb-jar.xml):

  <message-driven>
    <ejb-name>JMSSignalReceiver</ejb-name>
    <ejb-class>org.jbpm.process.workitem.jms.JMSSignalReceiver</ejb-class>
    <transaction-type>Bean</transaction-type>
    <activation-config>
      <activation-config-property>
        <activation-config-property-name>destinationType</activation-config-property-name>
        <activation-config-property-value>javax.jms.Queue</activation-config-property-value>
      </activation-config-property>
      <activation-config-property>
        <activation-config-property-name>destination</activation-config-property-name>
        <activation-config-property-value>java:/queue/KIE.SIGNAL</activation-config-property-value>
      </activation-config-property>
    </activation-config>
  </message-driven>
To enable serialized processing that MDB configuration should look like this:

 <message-driven>
   <ejb-name>JMSSignalReceiver</ejb-name>
   <ejb-class>org.jbpm.process.workitem.jms.JMSSignalReceiver</ejb-class>
   <transaction-type>Bean</transaction-type>
   <activation-config>
      <activation-config-property>
        <activation-config-property-name>destinationType</activation-config-property-name>
        <activation-config-property-value>javax.jms.Queue</activation-config-property-value>
      </activation-config-property>
      <activation-config-property>
        <activation-config-property-name>destination</activation-config-property-name>
        <activation-config-property-value>java:/queue/KIE.SIGNAL</activation-config-property-value>
      </activation-config-property>
      <activation-config-property>
        <activation-config-property-name>maxSession</activation-config-property-name>
        <activation-config-property-value>1</activation-config-property-value>
      </activation-config-property> 
    </activation-config>
  </message-driven>

That ensure that all messages (even if they are sent concurrently) will be processed serially. By that notifying the parent process instance in non concurrent way ensuring that all notification will be delivered and will not cause conflicts - concurrent updates on same process instance.

With that we have fully featured solution that deals with complex process that requires high throughput with asynchronous processing. So now it's time to see what results we can expect from execution and see if different versions of main process differ in execution times.

Sample execution results

Following table represents sample execution results of the described process and might differ between different environments although any one is more than welcome to give it a try and report back how it actually performed.


100 instances300 instances500 instance
Call Activity with JMS executor7 sec24 sec41 sec
Async Start Task with JMS executor4 sec21 sec28 sec
Call Activity with polling executor (1 thread, 1 sec interval)1 min 44 sec5 min 11 sec8 min 44 sec
Async Start Task with polling executor (1 thread, 1 sec interval)3 min 21 sec10 min17 min 42 sec
Call Activity with polling executor (10 threads, 1 sec interval)17 sec43 sec2 min 13 sec
Async Start Task with polling executor (10 threads, 1 sec interval)"20 sec1 min 2 sec1 min 41 sec

Conclusions:

as you can see, JMS based processing is extremely fast compared to polling based only. In fact the fastest is when using async start process command for starting child process instances. The difference increases with number of sub process instances to be created.
From the other hand, using polling based executor only with async start process command is the slowest, and that is expected as well, as all start process commands are still handled by polling executor which will not run fast enough. 
In all the cases the all processing completed successfully but the time required to complete processing differs significantly. 


If you're willing to try that yourself, just downloaded 6.3.0 version of jBPM console (aka kie-wb) and then clone this repository into your environment. Once you have that in place go to async-perf project and build and deploy it. Once it's deployed successfully you can play around with the async execution:
  • miprocess-async is the main process that uses async start process command to start child process instance
  • miprocess is the main process that uses call activity to start child process instances
In both cases upon start you'll be asked for number of subprocesses to create. Just pick a number and run it!

Note that by default the JMS receiver will receive signals concurrently so unless you reconfigure it you'll see concurrent updates to parent process failing for some requests.

Have fun and comments and results reports welcome


10 komentarzy:

  1. Hi Maciej,

    Great article, as always !!

    I have a very common scenario and believe you might also have come across this. We have a clustered WebSphere environment with 2 nodes. The same application is deployed in 2 nodes. Thus, both nodes have Spring's JMS Listener Container deployed and they are registered as consumers.

    Now, from WebSphere's perspective, there are 2 consumers and whenever a signal is emitted externally, it posts one jms message, which in turn is sent to both the consumers and they process simultaneously, causing a OptimisticLockException.

    Running the listener in one node is not good in a cluster.

    Can you please advise how to tackle this?

    Thanks,
    Anindya

    OdpowiedzUsuń
    Odpowiedzi
    1. as far as I know it's not possible that single message is processed (and by that received) by several listeners/consumers. That would be against queue concept where single message can be delivered to single consumer only. Make sure you consumers are properly configured and your jms provided to ensure single message not being delivered to multiple endpoints - maybe you need to change or fine tune your transaction behavior.

      Usuń
  2. Hi Maciej,

    Just to update you on this. I fixed this problem by using the recommended DisabledFollowOnLockOracle10gDialect for hibernate and also adding the indexes. Somehow hibernate was not creating DDLs automatically. After adding the indexes and that dialect, the application is not free of lock acquisition exceptions.

    Thanks,
    Anindya

    OdpowiedzUsuń
  3. Hi Maciej,

    Can you please suggest on how the parent-child relationship is achieved using the Async Start process? It is set automatically using Call Activity, but not using Async Start process.

    Please guide on this.

    Thanks,
    Anindya

    OdpowiedzUsuń
  4. the async start process command does not make this association so you would have to do it yourself like via process variable

    OdpowiedzUsuń
  5. Hey Maciej,

    I am new to this Jbpm world.
    And the catch is I have to do similar thing just with vertx 3.3.x.

    I need your help in doing that.

    Thanks,
    Rishi Sharma

    OdpowiedzUsuń
  6. Hi,

    Do you have any example with event based subprocess?

    OdpowiedzUsuń
  7. yes, you need to configure it in kie server as it was not there in 6.3
    https://github.com/droolsjbpm/droolsjbpm-integration/blob/6.5.x/kie-server-parent/kie-server-wars/kie-server/src/main/ee7-resources/META-INF/kie-server-jms.xml#L58-L64

    and
    https://github.com/droolsjbpm/droolsjbpm-integration/blob/6.5.x/kie-server-parent/kie-server-wars/kie-server/src/main/shared-ee6-ee7-resources/WEB-INF/ejb-jar.xml

    OdpowiedzUsuń
  8. Hi Maciej,
    Added the above changes in both kie-server-jms.xml and ejb-jar.xml

    still getting the exception while executing the miprocess-async bpmn file

    still are there any changes required to do I am sharing the exception below

    In AsyncStartProcessCommand getting exception like "No runtime manager found for deployment id org.jbpm.test:async-perf:1.0"

    2017-02-08 23:25:00,209 INFO [org.apache.deltaspike.core.util.ProjectStageProducer] (default task-33) Computed the following DeltaSpike ProjectStage: Production
    2017-02-08 23:25:00,528 INFO [org.jbpm.formModeler.service.bb.mvc.controller.ControllerServlet] (default task-39) Application Directory: C:/Programs/wildfly-8.2.0.Final/standalone/tmp/vfs/temp/temp4accc0d7014b2411/kie-wb.war-3f9b2f7d9c11a519
    2017-02-08 23:25:00,535 INFO [org.jbpm.formModeler.service.bb.mvc.controller.ControllerServlet] (default task-39) Application Config Directory: C:/Programs/wildfly-8.2.0.Final/standalone/tmp/vfs/temp/temp4accc0d7014b2411/kie-wb.war-3f9b2f7d9c11a519/WEB-INF/etc
    2017-02-08 23:25:05,636 INFO [org.jbpm.process.workitem.jms.JMSSendTaskWorkItemHandler] (default task-56) JMS based work item handler successfully activated on destination HornetQQueue[KIE.SIGNAL.QUEUE]
    2017-02-08 23:25:05,808 INFO [stdout] (default task-56) Creating MI list - number of loops 2

    2017-02-08 23:25:06,447 WARN [org.jbpm.executor.impl.AbstractAvailableJobsExecutor] (Thread-14 (HornetQ-client-global-threads-690529779)) Error during command org.jbpm.process.core.async.AsyncStartProcessCommand error message No runtime manager found for deployment id org.jbpm.test:async-perf:1.0: java.lang.IllegalArgumentException: No runtime manager found for deployment id org.jbpm.test:async-perf:1.0
    at org.jbpm.process.core.async.AsyncStartProcessCommand.execute(AsyncStartProcessCommand.java:62) [jbpm-flow-6.3.0.Final.jar:6.3.0.Final]
    at org.jbpm.executor.impl.AbstractAvailableJobsExecutor.executeGivenJob(AbstractAvailableJobsExecutor.java:114) [jbpm-executor-6.3.0.Final.jar:6.3.0.Final]
    at org.jbpm.executor.impl.jms.JmsAvailableJobsExecutor.onMessage(JmsAvailableJobsExecutor.java:47) [jbpm-executor-6.3.0.Final.jar:6.3.0.Final]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [rt.jar:1.7.0_79]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) [rt.jar:1.7.0_79]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.7.0_79]
    at java.lang.reflect.Method.invoke(Method.java:606) [rt.jar:1.7.0_79]
    at org.jboss.as.ee.component.ManagedReferenceMethodInterceptor.processInvocation(ManagedReferenceMethodInterceptor.java:52)
    at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
    at org.jboss.invocation.WeavedInterceptor.processInvocation(WeavedInterceptor.java:53)
    at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63)
    at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)



    Thanks
    Anil Kumar

    OdpowiedzUsuń
    Odpowiedzi
    1. looks like you don't have container with id org.jbpm.test:async-perf:1.0 deployed to your kie server. So make sure you have that container deployed otherwise it won't be found on runtime. This more looks like a deployment id from workbench than container id from kie server...

      Usuń