2014/02/06

jBPM 6 - store your process variables anywhere

Most of jBPM users is aware of how jBPM stores process variable but let's recap it here again just for completeness.

NOTE: this article covers jBPM that uses persistence as without persistence process variables are kept in memory only.

 jBPM puts single requirement on the objects that are used as process variables:
  • object must be serializable (simply must implement java.io.Serializable interface)
with that jBPM engine is capable to store all process variables as part of process instance using marshaling mechanism that is backed by Google Protocol Buffers. That means actual instances are marshaled into bytes and stored in data base. This is not always desired especially in case of objects that are actually not owned by the process instance. For example:
  • JPA entities of another system
  • documents stored in document/content management system 
  • etc
Luckily, jBPM has a solution to that as well called pluggable Variable Persistence Strategy. Out of the box jBPM provides two strategies:
  • serialization based, mentioned above that actually works on all object types as long as they are serializable (org.drools.core.marshalling.impl.SerializablePlaceholderResolverStrategy)
  • JPA based that works on objects that are entities (org.drools.persistence.jpa.marshaller.JPAPlaceholderResolverStrategy)
Let's spend some time on the JPA based strategy as it might become rather useful in many cases where jBPM is used in embedded mode. Consider following scenario where our business process uses entities as process variables. The same entities might be altered from outside of the process and we would like to keep them up to date within the process as well. To do so, we need to use JPA based strategy for variable persistence that is capable of storing entities in data base and then retrieving them back.
To configure variable persistence strategy you need to place it into the environment that is the used when creating knowledge sessions. Note that the order of the strategies is important as they will be evaluated which one will be used in the order they are given. best practice is to always set the serialization based strategy to be the last one. 
An example how you can use it with RuntimeManager:


// create entity manager factory
EntityManagerFactory emf = Persistence.createEntityManagerFactory("org.jbpm.sample");

RuntimeEnvironment environment = 
     RuntimeEnvironmentBuilder.Factory.get().newDefaultBuilder()
     .entityManagerFactory(emf) 
     .addEnvironmentEntry(EnvironmentName.OBJECT_MARSHALLING_STRATEGIES, 
          new ObjectMarshallingStrategy[]{
// set the entity manager factory for jpa strategy so it 
// know how to store and read entities     
               new JPAPlaceholderResolverStrategy(emf),
// set the serialization based strategy as last one to
// deal with non entities classes
               new SerializablePlaceholderResolverStrategy( 
                          ClassObjectMarshallingStrategyAcceptor.DEFAULT  )
         })  
     .addAsset(ResourceFactory.newClassPathResource("cmis-store.bpmn"), 
               ResourceType.BPMN2)
     .get();
// create the runtime manager and start using entities as part of your process  RuntimeManager manager = 
     RuntimeManagerFactory.Factory.get().newSingletonRuntimeManager(environment);

Once we know how to configure it, let's take some time to understand how it actually works. First of all, every process variable on the time when it's going to be persisted will be evaluated on the strategy and it's up to the strategy to accept or reject given variable, if accepted only that strategy will be used to persist the variable, if rejected other strategies will be consulted.

Note: make sure that you add your entity classes into persistence.xml that will be used by the jpa strategy

JPA will accept only classes that declares a field with @Id annotation (javax.persistence.Id) that allows us to ensure we will have an unique id to be used when retrieving the variable.
Serialization based one simply accepts all variables by default and thus it should be the last strategy inline. Although this default behavior can be altered by providing other acceptor implementation.

Once the strategy accepts the variable it performs marshaling operation to store the variable and unmarshaling to retrieve the variable from the back end store (of the type it supports).

In case of JPA, marshaling will check if entity is already stored entity - has id set - and:

  • if not, it will persist the entity using entity manager factory that was assigned to it
  • if yes, it will merge it with the persistence context to make sure up to date information is stored
when unmarshaling it will use the unique id of the entity to load it from the database and provide as process variable. It's that simple :)

With that, we quickly covered the default (serialization based) strategy and JPA based strategy. But the title of this article says we can store variables anywhere, so how's that possible?
It's possible because of the nature of variable persistence strategies - they are pluggable. We can create our own and simply add it to the environment and process variables that meets the acceptance criteria of the strategy will be persisted by that given strategy. To not leave you with empty hands let's look at another implementation I created for purpose of this article (although when working on it I believe it will become more than just example for this article).

Implementing variable persistence strategy is actually very simple, it's a matter of implementing single interface: org.kie.api.marshalling.ObjectMarshallingStrategy

public interface ObjectMarshallingStrategy {
    
    public boolean accept(Object object);

    public void write(ObjectOutputStream os,
                      Object object) throws IOException;
    
    public Object read(ObjectInputStream os) throws IOException, ClassNotFoundException;
    

    public byte[] marshal( Context context,
                           ObjectOutputStream os,
                           Object object ) throws IOException;
    
    public Object unmarshal( Context context,
                             ObjectInputStream is,
                             byte[] object,
                             ClassLoader classloader ) throws IOException, ClassNotFoundException;

    public Context createContext();
}

the most important methods for us are:

  • accept - decides if this strategy will be responsible for persistence of given object
  • marshal - performs operation to store process variable
  • unmarshal - performs operation to retrieve process variable
the other remaining are for backward compatibility reasons with old marshaling framework prior to protobuf, so it's not mandatory to be implemented but it's worth to put the logic there too as most likely it will be same as for marshal (write) and unmarshal (read).

So the mentioned example implementation is for storing and retrieving process variables as document from Content/Document management systems that support access to the repository using CMIS. I used Apache Chemistry as the integration component that can easily talk to CMIS enabled systems like for example Alfresco.


So first bit of requirements:

  • process variables must be of certain type to be stored in the content repository
  • documents (process variables stored in cms) can be:
    • created
    • updated (with versioning)
    • read
  • process variables must be kept up to date
so all these sounds simple and of course that's the point to keep it simple at this point. CMS can be used for much more but we wanted to get started and then enhance it if needed. So the implementation of strategy org.jbpm.integration.cmis.impl.OpenCMISPlaceholderResolverStrategy supports following:
  • when marshaling
    • create new documents if it does not have object id assigned yet
    • update document if it has already object id assigned
      • by overriding existing content
      • by creating new major version of the document 
      • by creating new minor version of the document
  • when unmarshaling
    • load the content of the document based on given object id
So you can actually use this strategy for:
  • creating new documents from the process based on custom content
  • update existing documents with custom content
  • load existing documents into process variable based on object id only
These are very high level details but let's look at the actual code that does that "magic", let's start with marshal logic - note that is bit simplified for readability here and complete code can be found in github.


public byte[] marshal(Context context, ObjectOutputStream os, Object object) throws IOException {
 Document document = (Document) object;
 // connect to repository
 Session session = getRepositorySession(user, password, url, repository);
 try {
  if (document.getDocumentContent() != null) {
   // no object id yet, let's create the document
   if (document.getObjectId() == null) {
    Folder parent = ... // find folder by path
    if (parent == null) {
     parent = .. // create folder
    }
    // now we are ready to create the document in CMS
   } else {
      // object id exists so time to update     
   }
  }
 // now nee need to store some info as part of the process instance
 // so we can later on look up, in this case is the object id and class
 // that we use as process variable so we can recreate the instance on read
     ByteArrayOutputStream buff = new ByteArrayOutputStream();
     ObjectOutputStream oos = new ObjectOutputStream( buff );
     oos.writeUTF(document.getObjectId());
     oos.writeUTF(object.getClass().getCanonicalName());
     oos.close();
     return buff.toByteArray();
 } finally {
  // let's clear the session in the end
  session.clear();
 }
}

so as you can see, it first deals with the actual storage (in this case CMIS based repository) and then save some small details to be able to recreate the actual object instance on reading. It stores objectId and fully qualified class name of the process variable. And that's it. Process variable of type Document will be stored inside content repository.

Then let's look at the unmarshal method:


public Object unmarshal(Context context, ObjectInputStream ois, byte[] object, ClassLoader classloader) throws IOException, ClassNotFoundException {
 DroolsObjectInputStream is = new DroolsObjectInputStream( new ByteArrayInputStream( object ), classloader );
 // first we read out the object id and class name we stored during marshaling
 String objectId = is.readUTF();
 String canonicalName = is.readUTF();
 // connect to repository
 Session session = getRepositorySession(user, password, url, repository);
 try {
  // get the document from repository and create new instance ot the variable class
  CmisObject doc = .....
  Document document = (Document) Class.forName(canonicalName).newInstance();
  // populate process variable with meta data and content
  document.setObjectId(objectId);
  document.setDocumentName(doc.getName());   
  document.setFolderName(getFolderName(doc.getParents()));
  document.setFolderPath(getPathAsString(doc.getPaths()));
  if (doc.getContentStream() != null) {
   ContentStream stream = doc.getContentStream();
   document.setDocumentContent(IOUtils.toByteArray(stream.getStream()));
   document.setUpdated(false);
   document.setDocumentType(stream.getMimeType());
  }
  return document;
 } catch(Exception e) {
  throw new RuntimeException("Cannot read document from CMIS", e);
 } finally {
  // do some clean up...
  is.close();
  session.clear();
 }
}

nothing more that the logic to get ids and class name so the instance can be recreated and load the document from cms repository and we're done :)

Last but not least, the accept method.


public boolean accept(Object object) {
    if (object instanceof Document) {
 return true;
    }
    return false;
}

and that is all that is needed to actually implement your own variable persistence strategy. The only thing left is to register the strategy on the environment so it will be evaluated when storing/retrieving variables. It's done the same way as described for JPA based on.

Complete source code with some tests showing complete usage case from process can be found here. Enjoy and feel free to provide feedback, maybe it's worth to start producing repository of such strategies so we can have rather rich set of strategies to be used...

25 comments:

  1. What does ManageVariablesProcessEventListener do? I mean I know what it does, but why is it necessary? Are the variables not stored on process completion?

    ReplyDelete
  2. process variables belong to process instance and since process instance is completed and by that removed from data base process variables are not serialized at that time thus this listener that might be required in some cases. But it does cover only update of the variable, and in some cases same approach as for process instance should be applied - like removal of the variables instead of updating. Does that help?

    ReplyDelete
  3. How can i remove Process Instance from Memory, and restore it with same state? Anyone have an idea?

    ReplyDelete
  4. I was able to use your examples for 6.2 and create new processes with a custom JPA entity. However, I'm having problems using the remote API to retrieve the variable. When I hit the rest endpoint, "/rest/task/11/content?deploymentId=myDeploymentId", I see this warning in the log

    Exception while unmarshaling content: java.lang.IllegalStateException: No strategy of type persistence.ProjectJPAResolverStrategy available.

    I do get a response with a long content string but I don't know how to deserialize it.

    ReplyDelete
    Replies
    1. could you open jira (https://issues.jboss.org/browse/JBPM) with all details in it as it looks like a bug. Please attach process you use to reproduce the issue and complete stack trace.

      Delete
  5. Would be possible to use this same approach (used for CMIS) but for an "Object-based file systems" impl (eg: CEPH) or a JSR-283 impl (eg: ModeShape) to store process documents? Does it make sense?

    ReplyDelete
    Replies
    1. of course, any kind of storage that fits you needs is applicable here. Just implement the marshaller strategy for it and register it via deployment descriptor and off you go!

      Delete
  6. Thank you for the information.
    Can we use a custom JPA strategy inside a KIE-Server Extension which you described in your other post.
    Basically I want to store all my process variables into separate Database and dont want to specify the JPA strategy in the kjar since everyone who uses the extension will have to remember to specify the JPA in their kie-deployment descriptors. I would like to have the JPA strategy defined in the extension itself so that any data that is sent as part of the custom extension REST API call will use it to store it in separate Database.

    ReplyDelete
    Replies
    1. you can create a global/server level deployment descriptor that will the apply to all kjars deployed to that kie server. See documentation for more details: https://docs.jboss.org/jbpm/release/6.5.0.Final/jbpm-docs/html/ch14.html#d0e15405

      Delete
  7. awesome. thank you Maciej

    ReplyDelete
  8. Hi Maciej,

    Thanks for the post.
    Can you please explain about the usage of JPA service task in detail with an example.

    Thanks and Regards,
    Somesh

    ReplyDelete
    Replies
    1. here you can find a sample workbench project https://github.com/mswiderski/bpm-projects/tree/master/jpa-project

      Delete
  9. This comment has been removed by the author.

    ReplyDelete
  10. Hi Maciej,

    How can i integrate your artificat jbpm-open-cmis into the Kie-Workbench wich we use remotely (rest) for creating and deploying processes? SO the OpenCMISPlaceholderResolverStrategy is automatically registred and we can create (in the kie workbench IDE) variables of type org.jbpm.Document?
    Thank you
    Aymen

    ReplyDelete
    Replies
    1. you need to package it and include (with all dependencies) into WEB-INF/lib of the workbench. Then register OpenCMISPlaceholderResolverStrategy via deployment descriptor as marshalling strategy and off you go!

      Delete
  11. Hi Maciej, just wondering how to pass this jpa entity as parameters when invoking the jbpm process using the kie server REST API? When I tried to pass it as a json, it didn't use the jpa marshalling strategy. Instead it used the default strategy. Your thoughts?

    ReplyDelete
    Replies
    1. in general all custom types (for instance those that are in kjar or dependency of the kjar) need to be wrapped with type information:

      {
      "variable" : {
      "JpaEntity" : {
      "field" : "value"
      }
      }
      }

      where JpaEntity is the simple class name, but you can use FQCN as well.

      Delete
    2. Thanks Maciej. I was able to get it to work now. Appreciate your prompt response.

      Delete
  12. Hi Maciej,

    I'm successfully running jbpm 6.3 integrated in a java ee 6 appliacation deployed on jboss eap 6.4 server.
    I want to upgrade my server to eap 7.0 and jbpm 7.3 but I have a problem with the JPAPlaceholderResolverStrategy.

    I'm getting javax.persistence.OptimisticLockException when I complete a user task. I figured out that the marshall method is invoked once for the user task completion(that is for the task output mappings) and also once for the NEXT task creation (this is new from jbpm 6.4 - Task variables auditing).

    I would really appreciate if you could give me some answers or suggestions!

    1. Is there a way to disable Task variables auditing?

    2. If this is the wrong approach... how can variable persistence work now? I have one main local process variable which is the input and output of all user tasks. Every task in the process updates the object(adds up new info). Because of that subsequent JPAPlaceholderResolverStrategy.marshall operations (EntityManager.merge) fail.

    3. What are the transaction boundaries for operations like task completion? I’m using jbpm-services-cdi and I’m not sure if my setup is ok. I use the PER_PROCESS_INSTANCE strategy for kjar deployment and a ejb stateless to talk to the jbpm-services-cdi wrapper services. Is JPAPlaceholderResolverStrategy thread safe? I also notice a big difference in hibernate Sessions opened count vs Closed when I invoke task operations like complete. I’m sorry for this messy “question”. I will appriciate any comment…

    ReplyDelete
    Replies
    1. Goran,

      this does not make too much sense to me... the marshaller strategy should be only invoked at transaction completion so it should not cause any optimistic lock exceptions.
      Moreover, task variable auditing do not use marshaller strategies, it simply uses toString of the object and saves that into db so it should not cause anything on the entity itself, unless toString is doing something funky.

      not sure why multiple calls to update/merge same entity would fail.. maybe there is an issue with transaction handling, what app server do you run it on?

      transaction boundaries are always the same safe points in the process - so when you complete task it will call a process instance to move forward and then it depends where is the next safe point, and this is where transaction will end. If you need to enforce safe point mark activity as is async (in designer) that will complete transaction before triggering that node and start it in another (background) thread.

      Delete
    2. Sorry for the nonsense about Task variables auditing.

      On JBoss EAP 6.4 (jbpm 6.3.0.Final) I experience the following. The JPAPlaceholderResolverStrategy marshall method is invoked 4 times after UserTaskService complete. There is no OptimisticLockException because the subsequent marshall calls pass the object with the correct @Version property. But in EAP 7.0 (jbpm 7.3.0.Final) this is not the case. The second marshall tries to merge with old @Version and throws OptimisticLockException.

      Here is the stacktrace from JBoss EAP 6.4 (jbpm 6.3.0.Final). I did not understand why this is called more than once and now you said it shouldn’t.

      I'm limited to 4,096 characters for this comment. I guess this is not the place for stacktrace pasting so here is a cropped version, i hope it tells you something.

      First marshal call stacktrace:
      my code -> MyJPAPlaceholderResolverStrategy.marshal(ObjectMarshallingStrategy$Context, ObjectOutputStream, Object) line: 101
      ProtobufProcessMarshaller.marshallVariablesContainer(MarshallerWriteContext, Map) line: 302
      ContentMarshallerHelper.marshallContent(Object, Environment) line: 148
      ContentMarshallerHelper.marshal(Object, Environment) line: 59
      TaskContentServiceImpl.addOutputContent(long, Map) line: 60
      MVELLifeCycleManager.taskOperation(Operation, long, String, String, Map, List, OrganizationalEntity...) line: 383


      Second marshall call stacktrace:

      my code -> MyJPAPlaceholderResolverStrategy.marshal(ObjectMarshallingStrategy$Context, ObjectOutputStream, Object) line: 101
      ProtobufProcessMarshaller.marshallVariablesContainer(MarshallerWriteContext, Map) line: 302
      ContentMarshallerHelper.marshallContent(Object, Environment) line: 148
      ContentMarshallerHelper.marshal(Object, Environment) line: 59
      TaskInstanceServiceImpl.addTask(Task, Map) line: 116
      AddTaskCommand.execute(Context) line: 109
      AddTaskCommand.execute(Context) line: 53

      UserTaskServiceCDIImpl(UserTaskServiceImpl).complete(Long, String, Map) line: 188

      my code -> BpmService.completeTask(TaskSummary, Map) line: 206


      Third marshal call:

      my code -> MyJPAPlaceholderResolverStrategy.marshal(ObjectMarshallingStrategy$Context, ObjectOutputStream, Object) line: 101
      ProtobufProcessMarshaller.marshallVariable(MarshallerWriteContext, String, Object) line: 265
      ProtobufRuleFlowProcessInstanceMarshaller(AbstractProtobufProcessInstanceMarshaller).writeProcessInstance(MarshallerWriteContext, ProcessInstance) line: 139
      ProtobufRuleFlowProcessInstanceMarshaller(AbstractProtobufProcessInstanceMarshaller).writeProcessInstance(MarshallerWriteContext, ProcessInstance) line: 62
      ProcessInstanceInfo.transform() line: 232
      TriggerUpdateTransactionSynchronization.beforeCompletion() line: 57
      JtaTransactionSynchronizationAdapter.beforeCompletion() line: 54
      ..
      UserTaskServiceCDIImpl(UserTaskServiceImpl).complete(Long, String, Map) line: 188
      UserTaskServiceCDIImpl$Proxy$_$$_WeldClientProxy.complete(Long, String, Map) line: not available
      my code -> BpmService.completeTask(TaskSummary, Map) line: 206

      Fourth marshall call:

      my code -> MyJPAPlaceholderResolverStrategy.marshal(ObjectMarshallingStrategy$Context, ObjectOutputStream, Object) line: 101
      ProtobufProcessMarshaller.marshallVariable(MarshallerWriteContext, String, Object) line: 265
      ProtobufProcessMarshaller.writeWorkItem(MarshallerWriteContext, WorkItem, boolean) line: 218
      ProtobufProcessMarshaller.writeWorkItem(MarshallerWriteContext, WorkItem) line: 359
      ProtobufOutputMarshaller.writeWorkItem(MarshallerWriteContext, WorkItem) line: 967
      WorkItemInfo.transform() line: 171
      ...
      UserTaskServiceCDIImpl(UserTaskServiceImpl).complete(Long, String, Map) line: 188
      UserTaskServiceCDIImpl$Proxy$_$$_WeldClientProxy.complete(Long, String, Map) line: not available
      my code -> BpmService.completeTask(TaskSummary, Map) line: 206


      Thank you for you answer!

      Delete
    3. since this is user task it will perform marshalling for:
      - user task itself
      - work item that user task is associated with
      - process instance variable

      there might be more if that variable is then used in another user task.
      So it can't be easily changed mainly because it cannot be changed when process variables do not use jpa but other mechanism. So what you could do is to implement tracking mechanism on your entity - like transient field that will be set by the marshaling strategy upon first marshaling and if it is set skip further saves to db.

      Delete
    4. Dear Maciej,

      It is me again. JPAPlaceholderResolverStrategy is still giving me troubles. I have implemented a tracking mechanism as you suggested (to execute em.merge only the first time) but there is a problem if you have: [User Task 1] -> entity -> [ Service Task 1] -> entity -> [User Task 2] if the service tasks alters the entity then the changes are not updated.

      I have then removed the tracking mechanism and tried to bypass the optimistic locking problem which exists after this commit (https://github.com/kiegroup/jbpm/commit/cbb0b79e59a1f6c9908ea020dbeece14080fc6f9#diff-7f040bffd3b626ca16c873364bbde5f8) because invocations of marshal method always use a transient entity, never a managed one that EntityManager merge returns. I managed to get around the optimistic lock problem by querying the database for the latest @Version value and overwrite it in memory just before the em.merge line i marshal method. But the multiple invocations of marshal are problematic for the same reason as mentioned above, just this time there is no exception but i am getting duplicate inserts for each relation entity that is created (transient, no pk in memory) in the service task.

      Do you have any advice for this problem. As I see it jBPM 6.4.0+ is not compatible with JPA optimistic locking and service tasks are barely usable.

      Delete
  13. Hi Maciej,

    I am new to JBPM and I have a very basic question. I am passing a list of objects(List) from my parent process to a child embedded sub process. In the child subprocess ,I am not able to get the invidual object (Person) in my custom work item. I saw your blog explaining this , which is passing a collection and able to process indiviaul object in the sub process. Somehow it is not working for me. I am using eclipse for modeling. Could you please point me what I might be missing here..

    Thanks
    Raj

    ReplyDelete
  14. I must express my gratitude for your kindness supporting men who must have help on your concern. Your special commitment to passing the solution across ended up being wonderfully advantageous and have continually helped many people much like me to get to their aims. Your personal helpful help implies a great deal a person like me and much more to my office workers. Best wishes; from all of us. Layered Process Audits

    ReplyDelete