wtorek, 20 sierpnia 2013

Make your work asynchronous

Asynchronous execution as part of a business process is common requirement. jBPM has had support for it via custom implementation of WorkItemHandler. In general it was as simple as providing async handler (is it as simple as it sounds?) that delegates the actual work to some worker e.g. a separate thread that proceeds with the execution.

Before we dig into details on jBPM v6 support for asynchronous execution let's look at what are the common requirements for such execution:

  • first and foremost it allows asynchronous execution of given piece of business logic
  • it allows to retry in case of resources are temporarily unavailable e.g. external system interaction
  • it allows to handle errors in case all retries has been attempted
  • it provides cancelation option
  • it provides history log of execution
When confronting these requirements with the "simple async handler" we can directly notice that all of these would need to be implemented all over again by different systems. So that is not so appealing, isn't?

jBPM executor to the rescue 

Since version 6, jBPM introduces new component called jbpm executor which provides quite advanced features for asynchronous execution. It delivers generic environment for background execution of commands. Commands are nothing more than business logic encapsulated with simple interface. It does not have any process runtime related information, that means no need to complete work items, or anything of that sort. It purely focuses on the business logic to be executed. It receives data via CommandContext and returns results of the execution with ExecutionResults. The most important rule for both input and output data is - it must be serializable.
Executor covers all requirements listed above and provides user interface as part of jbpm console and kie workbench (kie-wb) applications.

Illustrates Jobs panel in kie-wb application

Above screenshot illustrates history view of executor's job queue. As can be seen on it there are several options available:
  • view details of the job
  • cancel given job
  • create new job
With that quite few things can already be achieved. But what about executing logic as part of a process instance - via work item handler?

Async work item handler

jBPM (again since version 6) provides an out of the box async work item handler that is backed by the jbpm executor. So by default all features that executor delivers will be available for background execution within process instance. AsyncWorkItemHandler can be configured in two ways:
  1. as generic handler that expects to get the command name as part of work item parameters
  2. as specific handler for given type of work item - for example web service
Option number 1 is by default configured for jbpm console and kie-wb web applications and is registered under async name in every ksession that is bootstrapped within the applications. So whenever there is a need to execute some logic asynchronously following needs to be done at modeling time (using jbpm web designer):
  • specify async as TaskName property 
  • create data input called CommandClass
  • assign fully qualified class name for the CommandClass data input
Next follow regular way to complete process modeling. Note that all data inputs will be transferred to executor so they must be serializable.
Illustrates assignments for an async node (web service execution)

Second option allows to register different instances of AsyncWorkItemHandler for different work items. Since it's registered for dedicated work item most likely the command will be dedicated to that work item as well. If so CommandClass can be specified on registration time instead of requiring it to be set as work item parameters. To register such handlers for jbpm console or kie-wb additional class is required to inform what shall be registered. A CDI bean that implements WorkItemHandlerProducer interface needs to be provided and placed on the application classpath so CDI container will be able to find it. Then at modeling time TaskName property needs to be aligned with those used at registration time.

Ready to give it a try?

To see this working it's enough to give a try to the latest kie-wb or jbpm console build (either master or CR2). As soon as application is deployed, go to Authoring perspective and you'll find an async-examples project in jbpm-playground repository. It comes with three samples that illustrates asynchronous execution from within process instance:
  • async executor
  • async data executor
  • check weather
Async executor is the simplest execution process that allows execute commands asynchronously. When starting a process instance it will ask for fully qualified class name of the command, for demo purpose use org.jbpm.executor.commands.PrintOutCommand which is similar to the SystemOutWorkItemHandler that simple prints out to logs the content of the CommandContext. You can leave it empty or provide invalid command class name to see the error handling mechanism (using boundary error event).

Async data executor is preatty much same as Async executor but it does operate on custom data (included in the project - User and UserCommand). On start process form use org.jbpm.examples.cmd.UserCommand to invoke custom command included in the project.

Check weather is asynchronous execution of a web service call. It checks weather for any U.S. zip code and provides results as a human task. So on start form specify who should receive user task with results and what is the zip code of the city you would like to get weather forecast for.


Start Check weather process with async web service execution


And that's it, asynchronous execution is now available out of the box in jBPM v6. 

Have fun and as usual keep the comments coming so we can add more useful features!

39 komentarzy:

  1. Hi Maciej,
    these are very nice and useful demos.

    But how could I create input(Parameter) for more complicated complexType?

    For example for the GetCityForecastByZIPAndCity operation with two strings.
    <s:element name="GetCityForecastByZIPAndCity">
    <s:complexType>
    <s:sequence>
    <s:element minOccurs="0" maxOccurs="1" name="ZIP" type="s:string" />
    <s:element minOccurs="0" maxOccurs="1" name="City" type="s:string"/>
    </s:sequence>
    </s:complexType>
    </s:element>

    Do I have to use the wsconsume utility?
    If yes then it will lose dynamic method of calling web service.

    Or.
    Should I create the input object by example on the http://cxf.apache.org/docs/dynamic-clients.html site?
    There is the ComplexClient.java class with dynamically created input object.

    Have a nice day
    Miloslav Havrda from Czech republic.

    OdpowiedzUsuń
  2. Miloslav,
    it's up to the application to deliver what is actually needed on runtime. DynamicClient will generate classes only when they are not available on class path as far as I know. So you can use the wsdl upfront to generate the data model that will be needed and place it on application class path. And then create instances of that object before you enter node that would make use of it.
    Alternatively you can let command that will be executed as async job to do the heavy work. With these example I use WebServiceCommand that by default uses single parameter for service invocation. If that is not what is required you can provide an extension of this command or even provide completely new command.

    OdpowiedzUsuń
  3. Hello Maciej,
    I have downloaded jBPM version 6.0.1 Final from http://sourceforge.net/projects/jbpm/files/jBPM%206/jbpm-6.0.1.Final/ site.

    I have started console http://localhost:8080/jbpm-console and opened async executor business process.

    I have tried to validate the Async executor project from clean install.demo.
    Validation says Task node 'Task 1'[4] has no work name.

    I have tried to change a name from Task 1 to Task one.
    After change the validation says Task node 'Task one'[4] has no work name.

    I have tried to deploy project.
    Build and deploy failed.
    I see this error in the Problems window.
    Process 'Simple async execution Process' [AsyncExecution]: Task node 'Task one' [4] has no work name.
    async executor.bpmn2

    If I install clean demo and do nothing with async-examles then the async-examples can be deployed successfully.

    Do you have any ideas how to solve it?

    OdpowiedzUsuń
  4. Miloslav, there is a need to add the async into the aid file of the project. I just updated the one that is loaded up on application startup so it will now be possible to create new processes with async node in async-examples project.
    If you like to use it in another process you can add async work item definition into wid file yourself, take a look at the example here: https://github.com/guvnorngtestuser1/jbpm-console-ng-playground-kjar/blob/master/async-examples/src/main/resources/WorkDefinitions.wid#L60

    OdpowiedzUsuń
  5. Hello Maciej,
    thank you very much. Now it works.

    I have another question about the Check weater business process.

    There is the Result variable of String type in the async task.
    This variable is filled by the com.cdyne.ws.weatherws.WeatherReturn type.
    Why does not it throw ClassCastException?

    I have tested this variable on the Integer type.
    The same result. No exception.

    Regards
    Mila

    OdpowiedzUsuń
  6. this could be known issue that after redeploy class loaders are not properly cleaned and old one is still hanging around. I'll be working on this case soon so will update here when it's fixed.

    OdpowiedzUsuń
    Odpowiedzi
    1. It turned out that the problem was only in REST api that was incorrectly caching JAXBContexts and thus failing after redeploy of the project. Can you give a try with master to check all issues are resolved?

      Usuń
  7. Ten komentarz został usunięty przez autora.

    OdpowiedzUsuń
  8. Hi Maciej,
    I have triggered 5 async jobs from my BPMN. And i can see all these jobs in jobs perspective of kie-wb.
    However, only one job was running and all other were queued.
    My executor setting was - Thread Pool Size 10, Interval 3s.
    Even after 1 min, second job was not picked up by executor.
    Only one job was running and all other were queued.
    Why is it so?
    https://community.jboss.org/thread/238005

    Thanks,
    Rahul

    OdpowiedzUsuń
    Odpowiedzi
    1. Rahul, looks like the issues is with the executor service used in jbpm executor - we use ScheduledExecutorService and the schedule single task on that service to fire every 3 seconds by default so it does not take the full power of thread pool and thus limiting it's throughput. Please file a jira and I'll fix it quickly, just assign it to me.

      Usuń
  9. Hi Maciej,
    Thanks for reply.
    I found Ritu has filed JIRA for same -
    https://issues.jboss.org/browse/JBPM-4275

    Thanks,
    Rahul

    OdpowiedzUsuń
  10. Hi Maciej,
    Thanks for quick enhancement in jbpm executor
    https://issues.jboss.org/browse/JBPM-4275
    https://community.jboss.org/thread/238005


    Thanks,
    Rahul

    OdpowiedzUsuń
  11. Hi Maciej,
    I am able to execute async tasks using KIE-WB.
    But I want to execute same way in my java project in eclipse.
    I started executor service by -

    EntityManagerFactory emf = Persistence.createEntityManagerFactory("PersistenceUnit");
    executorService = ExecutorServiceFactory.newExecutorService(emf);
    executorService.setThreadPoolSize(10);
    executorService.setInterval(3);
    executorService.setRetries(3);
    executorService.init();
    System.out.println("Job Executor Started : " + executorService.isActive());
    .
    Now I want to run a BPMN which contains async tasks.
    When I start such BPMN,
    I get exception -
    org.jbpm.workflow.instance.WorkflowRuntimeException: [org.jbpm.examples.bpmn.async_test_01:1 - async:2] -- Could not find work item handler for async.

    I found we have scheduleRequest() method also for running Commands.
    But when I start any BPMN by ksession.startProcess()
    How async tasks in BPMN will be executed?

    https://community.jboss.org/thread/237808

    Can you give me one good example of using jbpm executor service without using KIE-WB?

    Thanks,
    Rahul

    OdpowiedzUsuń
  12. Ten komentarz został usunięty przez autora.

    OdpowiedzUsuń
  13. Hi,
    I resolved above mentioned problem by overriding getWorkItemHandlers method-

    @Override
    public Map getWorkItemHandlers(RuntimeEngine runtime) {
    Map defaultHandlers = new HashMap();
    //HT handler
    WorkItemHandler handler = getHTWorkItemHandler(runtime);
    defaultHandlers.put("Human Task", handler);
    // add any custom registered
    defaultHandlers.putAll(super.getWorkItemHandlers(runtime));
    defaultHandlers.put("async", new AsyncWorkItemHandler(executorService, "org.jbpm.examples.cmd.UserCommand"));
    return defaultHandlers;
    }

    Thanks,
    Rahul

    OdpowiedzUsuń
  14. glad you found it, you're faster in solving issues then I replying to your comments :) Keep up the good work!

    OdpowiedzUsuń
  15. Ten komentarz został usunięty przez autora.

    OdpowiedzUsuń
  16. Ten komentarz został usunięty przez autora.

    OdpowiedzUsuń
  17. Based on the return status, i need to take next step of the process.

    OdpowiedzUsuń
  18. Hi,

    I am JBPM6 first time user.I want to call JAX-WS web service from the business process.Web service Acknowledges back immediately.After few hours of processing, it returns the status of Success/Failure.Based on the return status, next process step will be exectuted.

    what is the best approach to implement this Async invocation?
    Please advise.

    OdpowiedzUsuń
  19. Hi, Thank you for the great tutorial.. I do see the asyncexecutor in the list of service tasks when i open the sample using jbom console, however, when I add a new business process, I do not see it. Any idea what am I missing ?

    OdpowiedzUsuń
  20. Hi,
    I am using KieWorkBench 6.0.1 with jboss as 7.1.1.Final.
    I do the following steps:

    1. start jboss
    2. deploy MyProcess, which uses AsynWorkItemHandler
    3. start instance of MyProcess
    -> ok!
    4. shut down jboss
    5. start jboss
    6. start another instance of MyProcess, which leads to the following exception:
    ->
    ERROR [org.jbpm.process.instance.timer.TimerManager] (Thread-85) Error when executing timer job: org.drools.core.RuntimeDroolsException: Unexpected exception executing action org.jbpm.process.instance.event.DefaultSignalManager$SignalProcessInstanceAction@18093947
    at org.drools.core.common.AbstractWorkingMemory.executeQueuedActions(AbstractWorkingMemory.java:1246) [drools-core-6.0.1.Final.jar:6.0.1.Final]
    ...
    Caused by: org.jbpm.workflow.instance.WorkflowRuntimeException: [de.messagemobile.jbpm.sample.multipleinstances:32 - async:9] -- Could not find work item handler for async
    ...
    Caused by: org.drools.core.WorkItemHandlerNotFoundException: Could not find work item handler for async
    at org.drools.persistence.jpa.processinstance.JPAWorkItemManager.throwWorkItemNotFoundException(JPAWorkItemManager.java:63) [drools-persistence-jpa-6.0.1.Final.jar:6.0.1.Final]
    at org.drools.persistence.jpa.processinstance.JPAWorkItemManager.internalExecuteWorkItem(JPAWorkItemManager.java:58) [drools-persistence-jpa-6.0.1.Final.jar:6.0.1.Final]
    at org.jbpm.workflow.instance.node.WorkItemNodeInstance.internalTrigger(WorkItemNodeInstance.java:124) [jbpm-flow-6.0.1.Final.jar:6.0.1.Final]
    at org.jbpm.workflow.instance.impl.NodeInstanceImpl.trigger(NodeInstanceImpl.java:155) [jbpm-flow-6.0.1.Final.jar:6.0.1.Final]
    ... 38 more

    Any idea?

    Greetings!
    Claudia

    OdpowiedzUsuń
  21. actually no idea, that should work same way as it did with first invocation. Do you mind trying it with 6.1.0.Final or event 6.2.0.CR2?

    If that does not work please drop a jira with reproducer - for example your project or process.

    OdpowiedzUsuń
  22. Ok, I'llt try it with a newer version.
    Thanks for your reply.

    OdpowiedzUsuń
  23. Hi Maciej,

    how can I specify executor service when defining the handlers in workItemsHandlers file like this:

    drools.session.conf:
    drools.workItemHandlers = CustomWorkItemHandlers.conf

    CustomWorkItemHandlers.conf:
    [
    "AsyncREST" : new AsyncWorkItemHandler(?executorService?, "bpm.sample.RestCommand")
    ]

    OdpowiedzUsuń
  24. Dusan,

    unfortunately you cannot do it via con file but you can do it via deployment descriptor using mvel as resolver type (available in 6.1 and above):

    mvel
    new org.jbpm.executor.impl.wih.AsyncWorkItemHandler(org.jbpm.executor.ExecutorServiceFactory.newExecutorService(null)


    this assumes you run on kie-wb/jbpm-console so executor service will be already active so can eb fetched easily without giving the entity manager factory.

    OdpowiedzUsuń
    Odpowiedzi
    1. her is the xml based content for deployment descriptor:
      http://pastebin.com/wc8F8nfE

      Usuń
    2. Sorry, I think I am still missing something. In the conf file there are these things specified:

      "AsyncREST" : new AsyncWorkItemHandler(?executorService?, "bpm.sample.RestCommand")

      AsyncREST - the task name to be used in the process. I cannot see it in your example, but I see there is a section in the deployment descriptor, so I assume it somehow has to go there and reference the marshalling strategies.

      "bpm.sample.RestCommand" - the name of the command class to pass to the AsyncWorkItemHandler. Actually in your example you omit the second parameter which I am unsure if it is correct/intended.

      I appreciate if you can provide an example with the complete deployment-descriptor that binds the task name, command class, AsyncWorkItemHandler and the executor service.

      Thanks!

      Usuń
    3. sorry, provided you with marshaling strategy example while it should be work item handler. Check this file for details:
      https://github.com/droolsjbpm/kie-wb-distributions/blob/master/kie-wb/kie-wb-webapp/src/main/resources/META-INF/kie-wb-deployment-descriptor.xml#L12

      just replace the mvel expression with this:
      new org.jbpm.executor.impl.wih.AsyncWorkItemHandler(org.jbpm.executor.ExecutorServiceFactory.newExecutorService(null)

      and that should do the trick (I think ;))

      Usuń
    4. Thanks that worked! Here's a gist with the work-item-handler definition in case anyone needs it: https://gist.github.com/chromyd/26146f057936ce358e63

      Usuń
  25. how did you add usercommand class in kiewb. im not able to see any depencies being imported in the async examples? if i have to add my command class how do i add it?

    OdpowiedzUsuń
    Odpowiedzi
    1. you can add it via git or by (ab)using data object editor, create mojo and then replace it with command. But I'd recommend to use git integration to add your source code.

      Usuń
    2. i did add jar to dependencies.

      Usuń
  26. @Maciej, i tried the first option for asyn. but its still behaves in sync manner. i have a start node, async node,script node, end node. after i start the process it comes to async node and completes the task then moves to script node which should not be the case. could you please guide on what could be the problem here.

    OdpowiedzUsuń
    Odpowiedzi
    1. why do you think the async node should not be completed? it is same as it would be sync but it completes with another thread. Other than that nothing differs.

      Usuń
    2. okay. if the work is handled by another thread, then the main thread should proceed with the next activity,is it not so? if main thread waits for the another thread to complete work, then how is it async? is my understanding wrong?

      Usuń
    3. yes, your understanding is bit wrong - since you have a sequence defined in your process (tasks following each other) async means that process instances is put into wait state before async and directly resumed by async task in another thread. Then it will continue with all sync activities by the async thread. So the sequence of task is kept but there is a wait state introduced because of async activity.

      Usuń
    4. okay. is there any speacial reason for handing over the work to other thread? because even the main thread can do the same as other thread. is there any performance improvement? I am not able to get the point, sorry.

      Usuń
    5. that's why it's an option to run the task in background, main reasons are:
      - to not block the main thread
      - to put long running operations in background
      - to make sure previous tasks are not affected in case of that (async) task failure

      Usuń