EXECUTE DBMS_AQADM.STOP_QUEUE ( QUEUE_NAME => 'CS_MULTI_Q');
EXECUTE DBMS_AQADM.DROP_QUEUE ( QUEUE_NAME => 'CS_MULTI_Q');
EXECUTE DBMS_AQADM.DROP_QUEUE_TABLE ( QUEUE_TABLE => 'CS_MULTI_QTAB');
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE ( QUEUE_TABLE => 'CS_MULTI_QTAB', MULTIPLE_CONSUMERS => TRUE, QUEUE_PAYLOAD_TYPE => 'RAW' );
EXECUTE DBMS_AQADM.CREATE_QUEUE ( QUEUE_NAME => 'CS_MULTI_Q', QUEUE_TABLE => 'CS_MULTI_QTAB', RETENTION_TIME => DBMS_AQADM.INFINITE );
EXECUTE DBMS_AQADM.START_QUEUE ( QUEUE_NAME => 'CS_MULTI_Q');
Create durable subscriber
DECLARE
subscriber sys.aq$_agent;
BEGIN
subscriber := sys.aq$_agent('ONE', NULL, NULL);
DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'CS_MULTI_Q',subscriber => subscriber);
subscriber := sys.aq$_agent('TWO', NULL, NULL);
DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'CS_MULTI_Q',subscriber => subscriber);
END;
En-queue Messages:
DECLARE
ENQUEUE_OPTIONS DBMS_AQ.ENQUEUE_OPTIONS_T;
MESSAGE_PROPERTIES DBMS_AQ.MESSAGE_PROPERTIES_T;
MESSAGE_HANDLE RAW(16);
MESSAGE RAW(4096);
BEGIN
MESSAGE := HEXTORAW(RPAD('FF',4095,'FF'));
DBMS_AQ.ENQUEUE(QUEUE_NAME => 'CS_MULTI_Q',
ENQUEUE_OPTIONS => ENQUEUE_OPTIONS,
MESSAGE_PROPERTIES => MESSAGE_PROPERTIES,
PAYLOAD => MESSAGE,
MSGID => MESSAGE_HANDLE);
COMMIT;
END;
Now if we see, below queries shows the all the subscribers to this queue (ONE and TWO), and their numbers. (This query shows different results than select * from all_subscribers)
SELECT * FROM AQ$_CS_MULTI_QTAB_S WHERE NAME IS NOT NULL;
Below query shows the multiple copy of the messages - one per each subscriber.
SELECT * FROM AQ$_CS_MULTI_QTAB_I;
Dequeue the message as subscriber "TWO" using below PLSQL block:
DECLARE
DEQUEUE_OPTIONS DBMS_AQ.DEQUEUE_OPTIONS_T;
MESSAGE_PROPERTIES DBMS_AQ.MESSAGE_PROPERTIES_T;
MESSAGE_HANDLE RAW(16);
MESSAGE RAW(4096);
BEGIN
DEQUEUE_OPTIONS.WAIT := DBMS_AQ.NO_WAIT;
DEQUEUE_OPTIONS.CONSUMER_NAME := 'TWO';
DEQUEUE_OPTIONS.NAVIGATION := DBMS_AQ.FIRST_MESSAGE;
DBMS_AQ.DEQUEUE(QUEUE_NAME => 'CS_MULTI_Q',
DEQUEUE_OPTIONS => DEQUEUE_OPTIONS,
MESSAGE_PROPERTIES => MESSAGE_PROPERTIES,
PAYLOAD => MESSAGE,
MSGID => MESSAGE_HANDLE);
DBMS_OUTPUT.PUT_LINE ('Message: ' || message );
DEQUEUE_OPTIONS.NAVIGATION := DBMS_AQ.NEXT_MESSAGE;
COMMIT;
END;
After we dequeue this message, we can see that in queue table (CS_MULTI_QTAB), status field is still set to 0. This is because other subscribers (e.g. ONE) has not dequeued the message. Also we can see that in table AQ$_CS_MULTI_QTAB_I, the message for subscriber "TWO" is deleted.
Once all subscribers dequeues the message, the status field in Queue table (CS_MULTI_QTAB) will be set to 2 (if retention time is set), or message from queue table will be deleted.
Purge Records
EXECUTE DBMS_AQADM.PURGE_QUEUE_TABLE('CS_MULTI_QTAB', NULL, NULL);
No comments:
Post a Comment