Navigation Bar

Sunday, August 21, 2016

Oracle Advanced queues

Advanced queues is a feature of Oracle in which applications can communicate over the network by putting messages in queues(Enqueue) and getting messages from queues(dequeue). The messages can be stored in object type variables, known as the payload and transferred to remote databases over networks. This payload is then dequeued at destination server. The payload at the destination server must also have a similar datatype structure as that of the primary machine.

Advantages of advanced queues -
Not vulnerable to network, machine and application failures. These are known as Persistent Messages. At most there may be a delay in propogation of queued messages to destination servers.
Also the enqueueing(Producer programs) and dequeueing(Consumer programs) scripts can be run independent of each other. This is known as asynchronous communication.


When you create a queue, a subscriber will be attached to the queue. A subscriber is an agent for whom you will give various details.
On the host or primary database you will give details like the subscriber name, the destination queue (using database links), the destination queue for propogating the messages and the protocol to be used for transferring the messages. The subscriber on the primary database is called a propogator.
A similar subscriber will have to be created on the remote database also who will be assigned to the queue on the remote database from which messages have to be dequeued. The subscriber name here will be the same as that on the primary database for a particular queue for which messages have to be dequeued. The subscriber on the remote database is also called the recepient or consumer.
--Users can assign a unique correlation identifier to a message which can later be retrieved with this identifier.
There can be multiple consumers for a single message from a queue. Also a consumer can subscribe to multiple queues from which he can receive messages.
There are 3 options for consuming messages. They can be based on order, priority or a particular position relative to other messages.
Retention - AQ Administrators can specify the retention period for messages in the queue. This is to ensure that all consumers have sufficient time to dequeue the messages before it is removed from the queue.

Points to consider
The job_queue_processes value should be set to a proper values based on the number of queues in the database , number of existing jobs etc. A value of 0 means messages wont be propogated or dequeued.


.......To be continued


Below is a sample queue for enqueuing and dequeuing messages on the same database

CONNECT sys/change_on_install as sysdba

---------------------------------------------------------

DROP USER aq_admin CASCADE;

CREATE USER aq_admin IDENTIFIED BY aq_admin
  DEFAULT TABLESPACE users
  TEMPORARY TABLESPACE temp;

ALTER USER aq_admin QUOTA UNLIMITED ON users;

GRANT aq_administrator_role TO aq_admin;
GRANT connect               TO aq_admin;
GRANT create type           TO aq_admin;
GRANT create sequence       TO aq_admin;
GRANT CREATE JOB TO aq_admin;
GRANT CREATE PROCEDURE TO aq_admin;
GRANT CREATE TABLE TO aq_admin;

EXECUTE dbms_aqadm.grant_type_access('aq_admin');

-- -------------------------------------------------------

DROP USER aq_user CASCADE;

CREATE USER aq_user IDENTIFIED BY aq_user
  DEFAULT TABLESPACE users
  TEMPORARY TABLESPACE temp;

GRANT aq_user_role TO aq_user;
GRANT connect      TO aq_user;
GRANT EXECUTE ON DBMS_AQ TO aq_user;
GRANT EXECUTE ON DBMS_AQ TO aq_admin;
GRANT EXECUTE ON aq_admin.PR_AQ_DEQUEUE TO aq_user;


CONNECT aq_admin/aq_admin

CREATE TYPE demo_queue_payload_type AS OBJECT
( message VARCHAR2(4000) );
/
GRANT EXECUTE ON aq_admin.demo_queue_payload_type to aq_user;

GRANT ENQUEUE ANY QUEUE TO aq_user;
GRANT DEQUEUE ANY QUEUE TO aq_user;

BEGIN
     DBMS_AQADM.CREATE_QUEUE_TABLE (
        queue_table        => 'demo_queue_table',
        queue_payload_type => 'demo_queue_payload_type'
        );
END;
/

BEGIN
    DBMS_AQADM.CREATE_QUEUE (
     queue_name  => 'demo_queue',
     queue_table => 'demo_queue_table'
    );

    DBMS_AQADM.START_QUEUE (
        queue_name => 'demo_queue'
    );
END;
/
SHOW ERRORS

SELECT object_name, object_type
FROM   user_objects
WHERE  object_name != 'DEMO_QUEUE_PAYLOAD_TYPE';


DECLARE
     r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
     r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
     v_message_handle     RAW(16);
     o_payload            aq_admin.demo_queue_payload_type;

  BEGIN

    o_payload := aq_admin.demo_queue_payload_type('Here is a message 2');

    DBMS_AQ.ENQUEUE(
     queue_name         => 'aq_admin.demo_queue',
     enqueue_options    => r_enqueue_options,
     message_properties => r_message_properties,
     payload            => o_payload,
     msgid              => v_message_handle
    );
COMMIT;
END;
/


SELECT COUNT(*) FROM aq$demo_queue_table;

SELECT user_data FROM   aq$demo_queue_table;

DECLARE
     r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
     r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
     v_message_handle     RAW(16);
     o_payload            aq_admin.demo_queue_payload_type;

BEGIN

    r_dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;

    DBMS_AQ.DEQUEUE(
      queue_name         => 'aq_admin.demo_queue',
      dequeue_options    => r_dequeue_options,
      message_properties => r_message_properties,
      payload            => o_payload,
      msgid              => v_message_handle
     );

    DBMS_OUTPUT.PUT_LINE('*** Browsed message is [' || o_payload.message || '] ***');

END;
/


DECLARE

     r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
     r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
     v_message_handle     RAW(16);
     o_payload            aq_admin.demo_queue_payload_type;

BEGIN

DBMS_AQ.DEQUEUE(
  queue_name         => 'aq_admin.demo_queue',
           dequeue_options    => r_dequeue_options,
           message_properties => r_message_properties,
           payload            => o_payload,
           msgid              => v_message_handle
        );

       DBMS_OUTPUT.PUT_LINE(
           '*** Dequeued message is [' || o_payload.message || '] ***');

    COMMIT;
END;
/


BEGIN
  DBMS_SCHEDULER.CREATE_JOB (
   job_name                 =>  'AQ_JOB',
   job_type                 =>  'PLSQL_BLOCK',
   job_action               =>  'BEGIN
    PR_AQ_DEQUEUE;
END;',
   start_date          =>  TO_DATE(SYSDATE),
   enabled => TRUE          
   );
END;
/
SHOW ERRORS

CREATE TABLE aq_admin.MESSAGE_DQ
(
  message_id   NUMBER,
  DQ_MESSAGE   VARCHAR2(1000)
) TABLESPACE USERS;

CREATE TABLE aq_admin.MESSAGE_EXCEPTIONS
(
  message_id    NUMBER,
  ORA_MESSAGE   VARCHAR2(1000)
) TABLESPACE USERS;


create or replace PROCEDURE PR_AQ_DEQUEUE
AS
     r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
     r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
     v_message_handle     RAW(16);
     o_payload            aq_admin.demo_queue_payload_type;
   
     v_queue_flag         VARCHAR2(10);
 --    v_exec_cntr          NUMBER := 0;
     v_message_id         NUMBER;
     v_error_msg          VARCHAR2(1000);
   
BEGIN


 
--    r_dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
--    WHILE v_queue_flag = 'START'
    LOOP --{
--        v_exec_cntr := v_exec_cntr + 1;
        SELECT QUEUE_FLAG
         INTO  v_queue_flag
         FROM AQ_CONTROLLER
         WHERE QUEUE_OWNER = 'aq_admin'
           AND QUEUE_NAME = 'DEMO_QUEUE';
     
       IF v_queue_flag = 'STOP'
       THEN
        EXIT;
       END IF;
     
--       IF v_exec_cntr = 5
--       THEN
--        EXIT;
--       END IF;
 
    dbms_output.put_line('v_queue_flag ' || v_queue_flag);
 
--    DBMS_LOCK.SLEEP(6);
    v_message_id := MESSAGE_SEQ.NEXTVAL;
 
    BEGIN
        o_payload   := aq_admin.demo_queue_payload_type(NULL);
        r_dequeue_options.wait := 5;
        DBMS_AQ.DEQUEUE(
          queue_name         => 'aq_admin.demo_queue',
          dequeue_options    => r_dequeue_options,
          message_properties => r_message_properties,
          payload            => o_payload,
          msgid              => v_message_handle
         );
    EXCEPTION
        WHEN OTHERS
          THEN
            IF SQLCODE = -25228
            THEN
                NULL;
--                dbms_output.put_line('SQLCODE ' || SQLCODE);
             
--                INSERT INTO aq_admin.MESSAGE_EXCEPTIONS(message_id, ORA_MESSAGE)
--                VALUES (v_message_id, v_error_msg);
             ELSE
                v_error_msg := SQLERRM;
                INSERT INTO aq_admin.MESSAGE_EXCEPTIONS(message_id, ORA_MESSAGE)
                VALUES (v_message_id, 'ERROR :: ' || v_error_msg);
                EXIT;
            END IF;
    END;
 
    IF o_payload.message IS NOT NULL
    THEN
      INSERT INTO MESSAGE_DQ(message_id, DQ_MESSAGE)
      VALUES ( v_message_id, o_payload.message);  
    END IF;

    COMMIT;
    DBMS_OUTPUT.PUT_LINE('*** Browsed message is [' || o_payload.message || '] ***');
 
    END LOOP;
    RETURN;
END;


Some of the common errors you may face are
Make sure the queue table and queue are added before adding a subscriber to the queue and correct queue name is given when adding the queue.

If the db link name is invalid or not created you will get an entry in "sys.aq$_schedules" with LAST_ERROR_MST populated as follows
"ORA-02019: connection description for remote database not found"

If the destination queue owner is not correct will get an error in "sys.aq$_schedules"
"ORA-25205: the QUEUE <schema>.<queue_name> does not exist"

Make sure the destination queue  names are correct when adding subscriber and scheduling propogation.
If destination queue does not exist or is not started you will get an error message in
"DBA_QUEUE_SCHEDULES" -> LAST_ERROR_MSG
"ORA-25205: the QUEUE <schema>.<queue_name> does not exist".

If local queue_name is invalid you will get "ERROR : Queue does not exits" when adding the subscriber.
ORA-24010: QUEUE AQDEMO_QUEUE3 does not exist
ORA-06512: at "SYS.DBMS_SYS_ERROR", line 86
ORA-06512: at "SYS.DBMS_AQADM_SYS", line 6228
ORA-06512: at "SYS.DBMS_AQADM", line 364


Make sure job queue processes is set to a sufficient value the queue messages to get propogated.
In setting the number of JOB_QUEUE_PROCESSES, DBAs should be aware that this number is determined by the number of queues from which the messages have to be propagated and the number of destinations (rather than queues) to which messages have to be propagated.
Refer to https://docs.oracle.com/cd/A97630_01/appdev.920/a96587/qmanage.htm#65558 to determine this value


Quote for the day
“You Are Never To Old To Set Another Goal Or To Dream A New Dream.”- C.S. Lewis


No comments:

Post a Comment