When we raise event in EBS, it is enqueued in durable subscriber queue (WF_BPEL_Q). We could use EBS API or pure AQ (java/plsql) API to dequeue them. The message type stored in this queue is object type (WF_EVENT_T)
Use below query to check list of subscribers and subscriber number. Both queries shows subscriber list, but second query shows subscriber number, which is very helpful to troubleshoot later.
SELECT * FROM ALL_QUEUE_SUBSCRIBERS WHERE QUEUE_NAME = 'WF_BPEL_Q';
SELECT * FROM AQ$_WF_BPEL_QTAB_S WHERE NAME IS NOT NULL;
In WF_BPEL_QTAB, we can see all messages, and AQ$_WF_BPEL_QTAB_I would store copy of the message for each subscriber. The durable subscription is explained
here in detail.
SELECT * FROM AQ$_WF_BPEL_QTAB_I;
Enqueue using PLSQL (instead of raise event)
DECLARE
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message APPS.WF_EVENT_T;
BEGIN
message := APPS.WF_EVENT_T(50,SYSDATE,SYSDATE,NULL,APPS.WF_PARAMETER_LIST_T(APPS.WF_PARAMETER_T('USER_ID','20421'),APPS.WF_PARAMETER_T('RESP_ID','51378'),APPS.WF_PARAMETER_T('RESP_APPL_ID','800'),APPS.WF_PARAMETER_T('SECURITY_GROUP_ID','0'),APPS.WF_PARAMETER_T('ORG_ID','122'),APPS.WF_PARAMETER_T('PARTY_ID','3667135'),APPS.WF_PARAMETER_T('#CURRENT_PHASE','101')),'oracle.apps.ar.hz.Person.create','oracle.apps.ar.hz.Person.create659162',NULL,APPS.WF_AGENT_T('WF_BPEL_QAGENT','EBIZPROD.SPRINGSOA.COM'),NULL,NULL,NULL,NULL);
dbms_aq.enqueue(queue_name => 'WF_BPEL_Q',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
COMMIT;
END;
Register Custom Subscriber
Below code can be used to register custom durable subscriber to the queue.
DECLARE
SUBSCRIBER SYS.AQ$_AGENT;
BEGIN
SUBSCRIBER := SYS.AQ$_AGENT('CUSTOM', NULL, NULL);
DBMS_AQADM.ADD_SUBSCRIBER(QUEUE_NAME => 'WF_BPEL_Q',SUBSCRIBER => SUBSCRIBER);
END;
Dequeue using PLSQL
To Dequeue this using PLSQL, was a lot easier than Java, as WF_EVENT_T object is available at database level under APPS schema. PLSQL code below:
DECLARE
DEQUEUE_OPTIONS dbms_aq.dequeue_options_t;
MESSAGE_PROPERTIES dbms_aq.message_properties_t;
MESSAGE_HANDLE RAW(16);
MESSAGE APPS.WF_EVENT_T;
BEGIN
DEQUEUE_OPTIONS.WAIT := DBMS_AQ.NO_WAIT;
DEQUEUE_OPTIONS.CONSUMER_NAME := 'CUSTOM';
DEQUEUE_OPTIONS.NAVIGATION := DBMS_AQ.FIRST_MESSAGE;
DBMS_AQ.DEQUEUE(QUEUE_NAME => 'WF_BPEL_Q',
DEQUEUE_OPTIONS => DEQUEUE_OPTIONS,
MESSAGE_PROPERTIES => MESSAGE_PROPERTIES,
PAYLOAD => MESSAGE,
MSGID => MESSAGE_HANDLE);
DBMS_OUTPUT.PUT_LINE ('Message: ' || MESSAGE );
DEQUEUE_OPTIONS.NAVIGATION := DBMS_AQ.NEXT_MESSAGE;
COMMIT;
END;
Dequeue using Java API
To dequeue using Java using pure AQ API, I used JPublisher to convert WF_EVENT_T data type to WF_EVENT_T java class as below:
1) Running JPublisher on DOS prompt to generate Java object for WF_EVENT_T
set DATABASE_HOME=C:\oracledb\product\11.2.0\dbhome_1
set JAVA_HOME=C:\Oracle\Java\hotspot\jdk
set CLASSPATH=%DATABASE_HOME%\jdbc\lib\ojdbc5.jar;%DATABASE_HOME%\sqlj\lib\translator.jar;%DATABASE_HOME%\sqlj\lib\runtime12.jar
%JAVA_HOME%\bin\java -classpath %CLASSPATH% oracle.jpub.Doit -url=jdbc:oracle:thin:@ebiz.springsoa.com:1521:ebiz -user=apps/***** -sql=WF_EVENT_T -usertypes=oracle -methods=false -package=com.springsoa.jpub -usertypes=jdbc
This generate WF_EVENT_T.java, which you can copy to your project under right package folder (e.g. com.springsoa.jpub) in this case.
2) Writing Java code for Dequeue
package com.springsoa.util;
import com.springsoa.jpub.WF_EVENT_T;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.HashMap;
import java.util.Map;
import oracle.AQ.AQAgent;
import oracle.AQ.AQDequeueOption;
import oracle.AQ.AQDriverManager;
import oracle.AQ.AQMessage;
import oracle.AQ.AQQueue;
import oracle.AQ.AQSession;
import oracle.sql.STRUCT;
public class AQQueueConsumer {
public AQSession createSession() {
Connection connection;
AQSession aqSession = null;
try {
Class.forName("oracle.jdbc.driver.OracleDriver");
connection = DriverManager.getConnection("jdbc:oracle:thin:@ebiz.springsoa.com:1521:ebiz","apps", "******");
connection.setAutoCommit(true);
Class.forName("oracle.AQ.AQOracleDriver");
aqSession = AQDriverManager.createAQSession(connection);
} catch (Exception ex) {
ex.printStackTrace();
}
return aqSession;
}
public void listDurableSubscribers(AQSession aqSession, String queueOwner, String queueName) throws Exception {
AQQueue queue = aqSession.getQueue(queueOwner, queueName); // select * from all_queues
AQAgent[] agents = queue.getSubscribers();
for(int i=0; i<agents.length; i++) {
System.out.println(" Consumers : " + agents[i].getName() );
}
}
public Map<String,String> dequeue(AQSession aqSession, String queueOwner, String queueName) throws Exception {
Map<String,String> messageMap = new HashMap<String,String>();
AQQueue queue = aqSession.getQueue(queueOwner, queueName); // select * from all_queues
AQDequeueOption aqDequeueOption = new AQDequeueOption(); // https://docs.oracle.com/cd/B19306_01/server.102/b14257/aq_views.htm
aqDequeueOption.setConsumerName("CUSTOM"); // ORA_6H0JECQ66OO48D1N6SRJIC1N61 ORA_70skccq26kok8cq6752j0da665
aqDequeueOption.setNavigationMode(AQDequeueOption.NAVIGATION_FIRST_MESSAGE);
//aqDequeueOption.setCondition("tab.user_data.event_name = 'oracle.apps.xxbmc.employeePublish.updateEvent'");
aqDequeueOption.setWaitTime(AQDequeueOption.WAIT_NONE);
AQMessage message = queue.dequeue(aqDequeueOption, WF_EVENT_T.class);
if( message != null && message.getObjectPayload() != null && message.getObjectPayload().getPayloadData() != null ) {
WF_EVENT_T wfEventT = (WF_EVENT_T) message.getObjectPayload().getPayloadData();
messageMap.put("eventName",wfEventT.getEventName());
messageMap.put("eventKey",wfEventT.getEventKey());
if( wfEventT.getParameterList() != null && wfEventT.getParameterList().getArray() != null ) {
Object[] parameters = (Object[]) wfEventT.getParameterList().getArray();
for(int i=0;i<parameters.length;i++) {
//System.out.println(" parameter[" + i + "] : " + ( (oracle.sql.STRUCT)parameters[i] ).dump() );
STRUCT parameter = (oracle.sql.STRUCT)parameters[i];
Object[] attributes = parameter.getAttributes();
if(attributes.length == 2) {
messageMap.put((String)attributes[0],(String)attributes[1]);
}
}
}
}
return messageMap;
}
public static void main(String[] args) {
AQSession aq_sess = null;
try {
AQQueueConsumer aqQueueConsumer = new AQQueueConsumer();
AQSession aqSession = aqQueueConsumer.createSession();
aqQueueConsumer.listDurableSubscribers(aqSession,"APPS", "WF_BPEL_Q");
Map map = aqQueueConsumer.dequeue(aqSession,"APPS", "WF_BPEL_Q");
System.out.println(" map " + map );
} catch (Exception ex) {
System.out.println("Exception: " + ex);
ex.printStackTrace();
}
}
}
In above code, dequeue method does the dequeue, code is quite similar to what we have in PLSQL, however some extra work needs to be done to get all parameters from WF_EVENT_T as WF_PARAMETER is not given as SQL object.
Please note that message in WF_BPEL_QTAB will not be removed or marked as status 2 until all durable subscribers consumes the message. However, message will be deleted for a specific subscriber ("CUSTOM" in above case) in
AQ$_WF_BPEL_QTAB_I.
Purge records
EXECUTE DBMS_AQADM.PURGE_QUEUE_TABLE('APPS.WF_BPEL_QTAB', NULL, NULL);