Friday, January 2, 2015

AQ Durable Subscripton

AQ Durable subscription works like durable topic subscription, and below are some tweaks we can use to check how it works internally and might help troubleshoot further.

 EXECUTE DBMS_AQADM.STOP_QUEUE ( QUEUE_NAME => 'CS_MULTI_Q');  
 EXECUTE DBMS_AQADM.DROP_QUEUE ( QUEUE_NAME => 'CS_MULTI_Q');  
 EXECUTE DBMS_AQADM.DROP_QUEUE_TABLE ( QUEUE_TABLE => 'CS_MULTI_QTAB');  
 EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE ( QUEUE_TABLE     => 'CS_MULTI_QTAB', MULTIPLE_CONSUMERS => TRUE, QUEUE_PAYLOAD_TYPE => 'RAW' );   
 EXECUTE DBMS_AQADM.CREATE_QUEUE ( QUEUE_NAME => 'CS_MULTI_Q', QUEUE_TABLE => 'CS_MULTI_QTAB', RETENTION_TIME => DBMS_AQADM.INFINITE );   
 EXECUTE DBMS_AQADM.START_QUEUE ( QUEUE_NAME     => 'CS_MULTI_Q');   

Create durable subscriber

 DECLARE  
   subscriber sys.aq$_agent;  
 BEGIN  
   subscriber := sys.aq$_agent('ONE', NULL, NULL);  
   DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'CS_MULTI_Q',subscriber => subscriber);  
   subscriber := sys.aq$_agent('TWO', NULL, NULL);  
   DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'CS_MULTI_Q',subscriber => subscriber);   
 END;  

En-queue Messages:

 DECLARE  
   ENQUEUE_OPTIONS   DBMS_AQ.ENQUEUE_OPTIONS_T;  
   MESSAGE_PROPERTIES DBMS_AQ.MESSAGE_PROPERTIES_T;  
   MESSAGE_HANDLE   RAW(16);  
   MESSAGE       RAW(4096);  
 BEGIN  
   MESSAGE := HEXTORAW(RPAD('FF',4095,'FF'));   
   DBMS_AQ.ENQUEUE(QUEUE_NAME => 'CS_MULTI_Q',  
      ENQUEUE_OPTIONS   => ENQUEUE_OPTIONS,  
      MESSAGE_PROPERTIES  => MESSAGE_PROPERTIES,     
      PAYLOAD       => MESSAGE,          
      MSGID        => MESSAGE_HANDLE);  
   COMMIT;  
 END;  


Now if we see, below queries shows the all the subscribers to this queue (ONE and TWO), and their numbers. (This query shows different results than select * from all_subscribers)
 SELECT * FROM AQ$_CS_MULTI_QTAB_S WHERE NAME IS NOT NULL;  













Below query shows the multiple copy of the messages - one per each subscriber.
 SELECT * FROM AQ$_CS_MULTI_QTAB_I;  












Dequeue the message as subscriber "TWO" using below PLSQL block:

 DECLARE  
   DEQUEUE_OPTIONS   DBMS_AQ.DEQUEUE_OPTIONS_T;  
   MESSAGE_PROPERTIES DBMS_AQ.MESSAGE_PROPERTIES_T;  
   MESSAGE_HANDLE   RAW(16);  
   MESSAGE       RAW(4096);  
 BEGIN  
      DEQUEUE_OPTIONS.WAIT := DBMS_AQ.NO_WAIT;  
      DEQUEUE_OPTIONS.CONSUMER_NAME := 'TWO';  
      DEQUEUE_OPTIONS.NAVIGATION := DBMS_AQ.FIRST_MESSAGE;  
      DBMS_AQ.DEQUEUE(QUEUE_NAME => 'CS_MULTI_Q',  
       DEQUEUE_OPTIONS  => DEQUEUE_OPTIONS,  
       MESSAGE_PROPERTIES => MESSAGE_PROPERTIES,  
       PAYLOAD      => MESSAGE,  
       MSGID       => MESSAGE_HANDLE);  
      DBMS_OUTPUT.PUT_LINE ('Message: ' || message );  
      DEQUEUE_OPTIONS.NAVIGATION := DBMS_AQ.NEXT_MESSAGE;  
      COMMIT;  
 END;    

After we dequeue this message, we can see that in queue table (CS_MULTI_QTAB), status field is still set to 0. This is because other subscribers (e.g. ONE) has not dequeued the message. Also we can see that in table AQ$_CS_MULTI_QTAB_I, the message for subscriber "TWO" is deleted.






Once all subscribers dequeues the message, the status field in Queue table (CS_MULTI_QTAB) will be set to 2 (if retention time is set), or message from queue table will be deleted.


Purge Records

 EXECUTE DBMS_AQADM.PURGE_QUEUE_TABLE('CS_MULTI_QTAB', NULL, NULL);   

No comments: