Monday, January 5, 2015

Consuming EBS events (WF_BPEL_Q) using Java/PLSQL AQ API

When we raise event in EBS, it is enqueued in durable subscriber queue (WF_BPEL_Q). We could use EBS API or pure AQ (java/plsql) API to dequeue them. The message type stored in this queue is object type (WF_EVENT_T)

Use below query to check list of subscribers and subscriber number. Both queries shows subscriber list, but second query shows subscriber number, which is very helpful to troubleshoot later.

 SELECT * FROM ALL_QUEUE_SUBSCRIBERS WHERE QUEUE_NAME = 'WF_BPEL_Q';  
 SELECT * FROM AQ$_WF_BPEL_QTAB_S WHERE NAME IS NOT NULL;  


In WF_BPEL_QTAB, we can see all messages, and AQ$_WF_BPEL_QTAB_I would store copy of the message for each subscriber. The durable subscription is explained here in detail.

 SELECT * FROM AQ$_WF_BPEL_QTAB_I;  


Enqueue using PLSQL (instead of raise event)
 DECLARE  
   enqueue_options   dbms_aq.enqueue_options_t;  
   message_properties dbms_aq.message_properties_t;  
   message_handle   RAW(16);  
   message       APPS.WF_EVENT_T;  
 BEGIN  
   message := APPS.WF_EVENT_T(50,SYSDATE,SYSDATE,NULL,APPS.WF_PARAMETER_LIST_T(APPS.WF_PARAMETER_T('USER_ID','20421'),APPS.WF_PARAMETER_T('RESP_ID','51378'),APPS.WF_PARAMETER_T('RESP_APPL_ID','800'),APPS.WF_PARAMETER_T('SECURITY_GROUP_ID','0'),APPS.WF_PARAMETER_T('ORG_ID','122'),APPS.WF_PARAMETER_T('PARTY_ID','3667135'),APPS.WF_PARAMETER_T('#CURRENT_PHASE','101')),'oracle.apps.ar.hz.Person.create','oracle.apps.ar.hz.Person.create659162',NULL,APPS.WF_AGENT_T('WF_BPEL_QAGENT','EBIZPROD.SPRINGSOA.COM'),NULL,NULL,NULL,NULL);  
   dbms_aq.enqueue(queue_name => 'WF_BPEL_Q',        
      enqueue_options   => enqueue_options,      
      message_properties  => message_properties,     
      payload       => message,          
      msgid        => message_handle);  
   COMMIT;  
 END;  



Register Custom Subscriber
Below code can be used to register custom durable subscriber to the queue.

 DECLARE  
   SUBSCRIBER SYS.AQ$_AGENT;  
 BEGIN  
   SUBSCRIBER := SYS.AQ$_AGENT('CUSTOM', NULL, NULL);  
   DBMS_AQADM.ADD_SUBSCRIBER(QUEUE_NAME => 'WF_BPEL_Q',SUBSCRIBER => SUBSCRIBER);  
 END;       

Dequeue using PLSQL
To Dequeue this using PLSQL, was a lot easier than Java, as WF_EVENT_T object is available at database level under APPS schema. PLSQL code below:

 DECLARE  
   DEQUEUE_OPTIONS   dbms_aq.dequeue_options_t;  
   MESSAGE_PROPERTIES dbms_aq.message_properties_t;  
   MESSAGE_HANDLE   RAW(16);  
   MESSAGE       APPS.WF_EVENT_T;  
 BEGIN  
      DEQUEUE_OPTIONS.WAIT := DBMS_AQ.NO_WAIT;  
      DEQUEUE_OPTIONS.CONSUMER_NAME := 'CUSTOM';  
      DEQUEUE_OPTIONS.NAVIGATION := DBMS_AQ.FIRST_MESSAGE;  
      DBMS_AQ.DEQUEUE(QUEUE_NAME => 'WF_BPEL_Q',  
       DEQUEUE_OPTIONS  => DEQUEUE_OPTIONS,  
       MESSAGE_PROPERTIES => MESSAGE_PROPERTIES,  
       PAYLOAD      => MESSAGE,  
       MSGID       => MESSAGE_HANDLE);  
      DBMS_OUTPUT.PUT_LINE ('Message: ' || MESSAGE );  
      DEQUEUE_OPTIONS.NAVIGATION := DBMS_AQ.NEXT_MESSAGE;  
      COMMIT;  
 END;  

Dequeue using Java API
To dequeue using Java using pure AQ API, I used JPublisher to convert WF_EVENT_T data type to WF_EVENT_T java class as below:

1) Running JPublisher on DOS prompt to generate Java object for WF_EVENT_T

 set DATABASE_HOME=C:\oracledb\product\11.2.0\dbhome_1  
 set JAVA_HOME=C:\Oracle\Java\hotspot\jdk  
 set CLASSPATH=%DATABASE_HOME%\jdbc\lib\ojdbc5.jar;%DATABASE_HOME%\sqlj\lib\translator.jar;%DATABASE_HOME%\sqlj\lib\runtime12.jar  
 %JAVA_HOME%\bin\java -classpath %CLASSPATH% oracle.jpub.Doit -url=jdbc:oracle:thin:@ebiz.springsoa.com:1521:ebiz -user=apps/***** -sql=WF_EVENT_T -usertypes=oracle -methods=false -package=com.springsoa.jpub -usertypes=jdbc  

This generate WF_EVENT_T.java, which you can copy to your project under right package folder (e.g. com.springsoa.jpub) in this case.

2) Writing Java code for Dequeue

 package com.springsoa.util;  
 import com.springsoa.jpub.WF_EVENT_T;  
 import java.sql.Connection;  
 import java.sql.DriverManager;  
 import java.util.HashMap;  
 import java.util.Map;  
 import oracle.AQ.AQAgent;  
 import oracle.AQ.AQDequeueOption;  
 import oracle.AQ.AQDriverManager;  
 import oracle.AQ.AQMessage;  
 import oracle.AQ.AQQueue;  
 import oracle.AQ.AQSession;  
 import oracle.sql.STRUCT;  
 public class AQQueueConsumer {  
   public AQSession createSession() {  
     Connection connection;  
     AQSession aqSession = null;  
     try {  
       Class.forName("oracle.jdbc.driver.OracleDriver");  
       connection = DriverManager.getConnection("jdbc:oracle:thin:@ebiz.springsoa.com:1521:ebiz","apps", "******");  
       connection.setAutoCommit(true);  
       Class.forName("oracle.AQ.AQOracleDriver");  
       aqSession = AQDriverManager.createAQSession(connection);  
     } catch (Exception ex) {  
       ex.printStackTrace();  
     }   
     return aqSession;  
   }  
   public void listDurableSubscribers(AQSession aqSession, String queueOwner, String queueName) throws Exception {  
     AQQueue queue = aqSession.getQueue(queueOwner, queueName);        // select * from all_queues  
     AQAgent[] agents = queue.getSubscribers();  
     for(int i=0; i<agents.length; i++) {  
       System.out.println(" Consumers : " + agents[i].getName() );  
     }  
   }  
   public Map<String,String> dequeue(AQSession aqSession, String queueOwner, String queueName) throws Exception {  
     Map<String,String> messageMap = new HashMap<String,String>();  
     AQQueue queue = aqSession.getQueue(queueOwner, queueName);        // select * from all_queues  
     AQDequeueOption aqDequeueOption = new AQDequeueOption();        // https://docs.oracle.com/cd/B19306_01/server.102/b14257/aq_views.htm  
     aqDequeueOption.setConsumerName("CUSTOM");   // ORA_6H0JECQ66OO48D1N6SRJIC1N61 ORA_70skccq26kok8cq6752j0da665      
     aqDequeueOption.setNavigationMode(AQDequeueOption.NAVIGATION_FIRST_MESSAGE);  
     //aqDequeueOption.setCondition("tab.user_data.event_name = 'oracle.apps.xxbmc.employeePublish.updateEvent'");  
     aqDequeueOption.setWaitTime(AQDequeueOption.WAIT_NONE);  
     AQMessage message = queue.dequeue(aqDequeueOption, WF_EVENT_T.class);  
     if( message != null && message.getObjectPayload() != null && message.getObjectPayload().getPayloadData() != null ) {  
       WF_EVENT_T wfEventT = (WF_EVENT_T) message.getObjectPayload().getPayloadData();  
       messageMap.put("eventName",wfEventT.getEventName());  
       messageMap.put("eventKey",wfEventT.getEventKey());  
       if( wfEventT.getParameterList() != null && wfEventT.getParameterList().getArray() != null ) {  
         Object[] parameters = (Object[]) wfEventT.getParameterList().getArray();  
         for(int i=0;i<parameters.length;i++) {  
           //System.out.println(" parameter[" + i + "] : " + ( (oracle.sql.STRUCT)parameters[i] ).dump() );  
           STRUCT parameter = (oracle.sql.STRUCT)parameters[i];  
           Object[] attributes = parameter.getAttributes();  
           if(attributes.length == 2) {  
             messageMap.put((String)attributes[0],(String)attributes[1]);  
           }  
         }  
       }  
     }  
     return messageMap;  
   }  
   public static void main(String[] args) {  
     AQSession aq_sess = null;  
     try {  
       AQQueueConsumer aqQueueConsumer = new AQQueueConsumer();  
       AQSession aqSession = aqQueueConsumer.createSession();  
       aqQueueConsumer.listDurableSubscribers(aqSession,"APPS", "WF_BPEL_Q");  
       Map map = aqQueueConsumer.dequeue(aqSession,"APPS", "WF_BPEL_Q");  
       System.out.println(" map " + map );  
     } catch (Exception ex) {  
       System.out.println("Exception: " + ex);  
       ex.printStackTrace();  
     }  
   }  
 }  

In above code, dequeue method does the dequeue, code is quite similar to what we have in PLSQL, however some extra work needs to be done to get all parameters from WF_EVENT_T as WF_PARAMETER is not given as SQL object.

Please note that message in WF_BPEL_QTAB will not be removed or marked as status 2 until all durable subscribers consumes the message. However, message will be deleted for a specific subscriber ("CUSTOM" in above case) in AQ$_WF_BPEL_QTAB_I. 

Purge records

 EXECUTE DBMS_AQADM.PURGE_QUEUE_TABLE('APPS.WF_BPEL_QTAB', NULL, NULL);  

1 comment:

Unknown said...

Can the execution of the command below impact the operation of Bpel?

EXECUTE DBMS_AQADM.PURGE_QUEUE_TABLE('APPS.WF_BPEL_QTAB', NULL, NULL);