Advanced queues is a feature of Oracle in which applications can communicate over the network by putting messages in queues(Enqueue) and getting messages from queues(dequeue). The messages can be stored in object type variables, known as the payload and transferred to remote databases over networks. This payload is then dequeued at destination server. The payload at the destination server must also have a similar datatype structure as that of the primary machine.
Advantages of advanced queues -
Not vulnerable to network, machine and application failures. These are known as Persistent Messages. At most there may be a delay in propogation of queued messages to destination servers.
Also the enqueueing(Producer programs) and dequeueing(Consumer programs) scripts can be run independent of each other. This is known as asynchronous communication.
When you create a queue, a subscriber will be attached to the queue. A subscriber is an agent for whom you will give various details.
On the host or primary database you will give details like the subscriber name, the destination queue (using database links), the destination queue for propogating the messages and the protocol to be used for transferring the messages. The subscriber on the primary database is called a propogator.
A similar subscriber will have to be created on the remote database also who will be assigned to the queue on the remote database from which messages have to be dequeued. The subscriber name here will be the same as that on the primary database for a particular queue for which messages have to be dequeued. The subscriber on the remote database is also called the recepient or consumer.
--Users can assign a unique correlation identifier to a message which can later be retrieved with this identifier.
There can be multiple consumers for a single message from a queue. Also a consumer can subscribe to multiple queues from which he can receive messages.
There are 3 options for consuming messages. They can be based on order, priority or a particular position relative to other messages.
Retention - AQ Administrators can specify the retention period for messages in the queue. This is to ensure that all consumers have sufficient time to dequeue the messages before it is removed from the queue.
Points to consider
The job_queue_processes value should be set to a proper values based on the number of queues in the database , number of existing jobs etc. A value of 0 means messages wont be propogated or dequeued.
.......To be continued
Below is a sample queue for enqueuing and dequeuing messages on the same database
CONNECT sys/change_on_install as sysdba
---------------------------------------------------------
DROP USER aq_admin CASCADE;
CREATE USER aq_admin IDENTIFIED BY aq_admin
DEFAULT TABLESPACE users
TEMPORARY TABLESPACE temp;
ALTER USER aq_admin QUOTA UNLIMITED ON users;
GRANT aq_administrator_role TO aq_admin;
GRANT connect TO aq_admin;
GRANT create type TO aq_admin;
GRANT create sequence TO aq_admin;
GRANT CREATE JOB TO aq_admin;
GRANT CREATE PROCEDURE TO aq_admin;
GRANT CREATE TABLE TO aq_admin;
EXECUTE dbms_aqadm.grant_type_access('aq_admin');
-- -------------------------------------------------------
DROP USER aq_user CASCADE;
CREATE USER aq_user IDENTIFIED BY aq_user
DEFAULT TABLESPACE users
TEMPORARY TABLESPACE temp;
GRANT aq_user_role TO aq_user;
GRANT connect TO aq_user;
GRANT EXECUTE ON DBMS_AQ TO aq_user;
GRANT EXECUTE ON DBMS_AQ TO aq_admin;
GRANT EXECUTE ON aq_admin.PR_AQ_DEQUEUE TO aq_user;
CONNECT aq_admin/aq_admin
CREATE TYPE demo_queue_payload_type AS OBJECT
( message VARCHAR2(4000) );
/
GRANT EXECUTE ON aq_admin.demo_queue_payload_type to aq_user;
GRANT ENQUEUE ANY QUEUE TO aq_user;
GRANT DEQUEUE ANY QUEUE TO aq_user;
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'demo_queue_table',
queue_payload_type => 'demo_queue_payload_type'
);
END;
/
BEGIN
DBMS_AQADM.CREATE_QUEUE (
queue_name => 'demo_queue',
queue_table => 'demo_queue_table'
);
DBMS_AQADM.START_QUEUE (
queue_name => 'demo_queue'
);
END;
/
SHOW ERRORS
SELECT object_name, object_type
FROM user_objects
WHERE object_name != 'DEMO_QUEUE_PAYLOAD_TYPE';
DECLARE
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload aq_admin.demo_queue_payload_type;
BEGIN
o_payload := aq_admin.demo_queue_payload_type('Here is a message 2');
DBMS_AQ.ENQUEUE(
queue_name => 'aq_admin.demo_queue',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
COMMIT;
END;
/
SELECT COUNT(*) FROM aq$demo_queue_table;
SELECT user_data FROM aq$demo_queue_table;
DECLARE
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload aq_admin.demo_queue_payload_type;
BEGIN
r_dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
DBMS_AQ.DEQUEUE(
queue_name => 'aq_admin.demo_queue',
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
DBMS_OUTPUT.PUT_LINE('*** Browsed message is [' || o_payload.message || '] ***');
END;
/
DECLARE
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload aq_admin.demo_queue_payload_type;
BEGIN
DBMS_AQ.DEQUEUE(
queue_name => 'aq_admin.demo_queue',
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
DBMS_OUTPUT.PUT_LINE(
'*** Dequeued message is [' || o_payload.message || '] ***');
COMMIT;
END;
/
BEGIN
DBMS_SCHEDULER.CREATE_JOB (
job_name => 'AQ_JOB',
job_type => 'PLSQL_BLOCK',
job_action => 'BEGIN
PR_AQ_DEQUEUE;
END;',
start_date => TO_DATE(SYSDATE),
enabled => TRUE
);
END;
/
SHOW ERRORS
CREATE TABLE aq_admin.MESSAGE_DQ
(
message_id NUMBER,
DQ_MESSAGE VARCHAR2(1000)
) TABLESPACE USERS;
CREATE TABLE aq_admin.MESSAGE_EXCEPTIONS
(
message_id NUMBER,
ORA_MESSAGE VARCHAR2(1000)
) TABLESPACE USERS;
create or replace PROCEDURE PR_AQ_DEQUEUE
AS
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload aq_admin.demo_queue_payload_type;
v_queue_flag VARCHAR2(10);
-- v_exec_cntr NUMBER := 0;
v_message_id NUMBER;
v_error_msg VARCHAR2(1000);
BEGIN
-- r_dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
-- WHILE v_queue_flag = 'START'
LOOP --{
-- v_exec_cntr := v_exec_cntr + 1;
SELECT QUEUE_FLAG
INTO v_queue_flag
FROM AQ_CONTROLLER
WHERE QUEUE_OWNER = 'aq_admin'
AND QUEUE_NAME = 'DEMO_QUEUE';
IF v_queue_flag = 'STOP'
THEN
EXIT;
END IF;
-- IF v_exec_cntr = 5
-- THEN
-- EXIT;
-- END IF;
dbms_output.put_line('v_queue_flag ' || v_queue_flag);
-- DBMS_LOCK.SLEEP(6);
v_message_id := MESSAGE_SEQ.NEXTVAL;
BEGIN
o_payload := aq_admin.demo_queue_payload_type(NULL);
r_dequeue_options.wait := 5;
DBMS_AQ.DEQUEUE(
queue_name => 'aq_admin.demo_queue',
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
EXCEPTION
WHEN OTHERS
THEN
IF SQLCODE = -25228
THEN
NULL;
-- dbms_output.put_line('SQLCODE ' || SQLCODE);
-- INSERT INTO aq_admin.MESSAGE_EXCEPTIONS(message_id, ORA_MESSAGE)
-- VALUES (v_message_id, v_error_msg);
ELSE
v_error_msg := SQLERRM;
INSERT INTO aq_admin.MESSAGE_EXCEPTIONS(message_id, ORA_MESSAGE)
VALUES (v_message_id, 'ERROR :: ' || v_error_msg);
EXIT;
END IF;
END;
IF o_payload.message IS NOT NULL
THEN
INSERT INTO MESSAGE_DQ(message_id, DQ_MESSAGE)
VALUES ( v_message_id, o_payload.message);
END IF;
COMMIT;
DBMS_OUTPUT.PUT_LINE('*** Browsed message is [' || o_payload.message || '] ***');
END LOOP;
RETURN;
END;
Some of the common errors you may face are
Make sure the queue table and queue are added before adding a subscriber to the queue and correct queue name is given when adding the queue.
If the db link name is invalid or not created you will get an entry in "sys.aq$_schedules" with LAST_ERROR_MST populated as follows
"ORA-02019: connection description for remote database not found"
If the destination queue owner is not correct will get an error in "sys.aq$_schedules"
"ORA-25205: the QUEUE <schema>.<queue_name> does not exist"
Make sure the destination queue names are correct when adding subscriber and scheduling propogation.
If destination queue does not exist or is not started you will get an error message in
"DBA_QUEUE_SCHEDULES" -> LAST_ERROR_MSG
"ORA-25205: the QUEUE <schema>.<queue_name> does not exist".
If local queue_name is invalid you will get "ERROR : Queue does not exits" when adding the subscriber.
ORA-24010: QUEUE AQDEMO_QUEUE3 does not exist
ORA-06512: at "SYS.DBMS_SYS_ERROR", line 86
ORA-06512: at "SYS.DBMS_AQADM_SYS", line 6228
ORA-06512: at "SYS.DBMS_AQADM", line 364
Make sure job queue processes is set to a sufficient value the queue messages to get propogated.
In setting the number of JOB_QUEUE_PROCESSES, DBAs should be aware that this number is determined by the number of queues from which the messages have to be propagated and the number of destinations (rather than queues) to which messages have to be propagated.
Refer to https://docs.oracle.com/cd/A97630_01/appdev.920/a96587/qmanage.htm#65558 to determine this value
Quote for the day
“You Are Never To Old To Set Another Goal Or To Dream A New Dream.”- C.S. Lewis
Advantages of advanced queues -
Not vulnerable to network, machine and application failures. These are known as Persistent Messages. At most there may be a delay in propogation of queued messages to destination servers.
Also the enqueueing(Producer programs) and dequeueing(Consumer programs) scripts can be run independent of each other. This is known as asynchronous communication.
When you create a queue, a subscriber will be attached to the queue. A subscriber is an agent for whom you will give various details.
On the host or primary database you will give details like the subscriber name, the destination queue (using database links), the destination queue for propogating the messages and the protocol to be used for transferring the messages. The subscriber on the primary database is called a propogator.
A similar subscriber will have to be created on the remote database also who will be assigned to the queue on the remote database from which messages have to be dequeued. The subscriber name here will be the same as that on the primary database for a particular queue for which messages have to be dequeued. The subscriber on the remote database is also called the recepient or consumer.
--Users can assign a unique correlation identifier to a message which can later be retrieved with this identifier.
There can be multiple consumers for a single message from a queue. Also a consumer can subscribe to multiple queues from which he can receive messages.
There are 3 options for consuming messages. They can be based on order, priority or a particular position relative to other messages.
Retention - AQ Administrators can specify the retention period for messages in the queue. This is to ensure that all consumers have sufficient time to dequeue the messages before it is removed from the queue.
Points to consider
The job_queue_processes value should be set to a proper values based on the number of queues in the database , number of existing jobs etc. A value of 0 means messages wont be propogated or dequeued.
.......To be continued
Below is a sample queue for enqueuing and dequeuing messages on the same database
CONNECT sys/change_on_install as sysdba
---------------------------------------------------------
DROP USER aq_admin CASCADE;
CREATE USER aq_admin IDENTIFIED BY aq_admin
DEFAULT TABLESPACE users
TEMPORARY TABLESPACE temp;
ALTER USER aq_admin QUOTA UNLIMITED ON users;
GRANT aq_administrator_role TO aq_admin;
GRANT connect TO aq_admin;
GRANT create type TO aq_admin;
GRANT create sequence TO aq_admin;
GRANT CREATE JOB TO aq_admin;
GRANT CREATE PROCEDURE TO aq_admin;
GRANT CREATE TABLE TO aq_admin;
EXECUTE dbms_aqadm.grant_type_access('aq_admin');
-- -------------------------------------------------------
DROP USER aq_user CASCADE;
CREATE USER aq_user IDENTIFIED BY aq_user
DEFAULT TABLESPACE users
TEMPORARY TABLESPACE temp;
GRANT aq_user_role TO aq_user;
GRANT connect TO aq_user;
GRANT EXECUTE ON DBMS_AQ TO aq_user;
GRANT EXECUTE ON DBMS_AQ TO aq_admin;
GRANT EXECUTE ON aq_admin.PR_AQ_DEQUEUE TO aq_user;
CONNECT aq_admin/aq_admin
CREATE TYPE demo_queue_payload_type AS OBJECT
( message VARCHAR2(4000) );
/
GRANT EXECUTE ON aq_admin.demo_queue_payload_type to aq_user;
GRANT ENQUEUE ANY QUEUE TO aq_user;
GRANT DEQUEUE ANY QUEUE TO aq_user;
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'demo_queue_table',
queue_payload_type => 'demo_queue_payload_type'
);
END;
/
BEGIN
DBMS_AQADM.CREATE_QUEUE (
queue_name => 'demo_queue',
queue_table => 'demo_queue_table'
);
DBMS_AQADM.START_QUEUE (
queue_name => 'demo_queue'
);
END;
/
SHOW ERRORS
SELECT object_name, object_type
FROM user_objects
WHERE object_name != 'DEMO_QUEUE_PAYLOAD_TYPE';
DECLARE
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload aq_admin.demo_queue_payload_type;
BEGIN
o_payload := aq_admin.demo_queue_payload_type('Here is a message 2');
DBMS_AQ.ENQUEUE(
queue_name => 'aq_admin.demo_queue',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
COMMIT;
END;
/
SELECT COUNT(*) FROM aq$demo_queue_table;
SELECT user_data FROM aq$demo_queue_table;
DECLARE
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload aq_admin.demo_queue_payload_type;
BEGIN
r_dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
DBMS_AQ.DEQUEUE(
queue_name => 'aq_admin.demo_queue',
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
DBMS_OUTPUT.PUT_LINE('*** Browsed message is [' || o_payload.message || '] ***');
END;
/
DECLARE
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload aq_admin.demo_queue_payload_type;
BEGIN
DBMS_AQ.DEQUEUE(
queue_name => 'aq_admin.demo_queue',
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
DBMS_OUTPUT.PUT_LINE(
'*** Dequeued message is [' || o_payload.message || '] ***');
COMMIT;
END;
/
BEGIN
DBMS_SCHEDULER.CREATE_JOB (
job_name => 'AQ_JOB',
job_type => 'PLSQL_BLOCK',
job_action => 'BEGIN
PR_AQ_DEQUEUE;
END;',
start_date => TO_DATE(SYSDATE),
enabled => TRUE
);
END;
/
SHOW ERRORS
CREATE TABLE aq_admin.MESSAGE_DQ
(
message_id NUMBER,
DQ_MESSAGE VARCHAR2(1000)
) TABLESPACE USERS;
CREATE TABLE aq_admin.MESSAGE_EXCEPTIONS
(
message_id NUMBER,
ORA_MESSAGE VARCHAR2(1000)
) TABLESPACE USERS;
create or replace PROCEDURE PR_AQ_DEQUEUE
AS
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload aq_admin.demo_queue_payload_type;
v_queue_flag VARCHAR2(10);
-- v_exec_cntr NUMBER := 0;
v_message_id NUMBER;
v_error_msg VARCHAR2(1000);
BEGIN
-- r_dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
-- WHILE v_queue_flag = 'START'
LOOP --{
-- v_exec_cntr := v_exec_cntr + 1;
SELECT QUEUE_FLAG
INTO v_queue_flag
FROM AQ_CONTROLLER
WHERE QUEUE_OWNER = 'aq_admin'
AND QUEUE_NAME = 'DEMO_QUEUE';
IF v_queue_flag = 'STOP'
THEN
EXIT;
END IF;
-- IF v_exec_cntr = 5
-- THEN
-- EXIT;
-- END IF;
dbms_output.put_line('v_queue_flag ' || v_queue_flag);
-- DBMS_LOCK.SLEEP(6);
v_message_id := MESSAGE_SEQ.NEXTVAL;
BEGIN
o_payload := aq_admin.demo_queue_payload_type(NULL);
r_dequeue_options.wait := 5;
DBMS_AQ.DEQUEUE(
queue_name => 'aq_admin.demo_queue',
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
EXCEPTION
WHEN OTHERS
THEN
IF SQLCODE = -25228
THEN
NULL;
-- dbms_output.put_line('SQLCODE ' || SQLCODE);
-- INSERT INTO aq_admin.MESSAGE_EXCEPTIONS(message_id, ORA_MESSAGE)
-- VALUES (v_message_id, v_error_msg);
ELSE
v_error_msg := SQLERRM;
INSERT INTO aq_admin.MESSAGE_EXCEPTIONS(message_id, ORA_MESSAGE)
VALUES (v_message_id, 'ERROR :: ' || v_error_msg);
EXIT;
END IF;
END;
IF o_payload.message IS NOT NULL
THEN
INSERT INTO MESSAGE_DQ(message_id, DQ_MESSAGE)
VALUES ( v_message_id, o_payload.message);
END IF;
COMMIT;
DBMS_OUTPUT.PUT_LINE('*** Browsed message is [' || o_payload.message || '] ***');
END LOOP;
RETURN;
END;
Some of the common errors you may face are
Make sure the queue table and queue are added before adding a subscriber to the queue and correct queue name is given when adding the queue.
If the db link name is invalid or not created you will get an entry in "sys.aq$_schedules" with LAST_ERROR_MST populated as follows
"ORA-02019: connection description for remote database not found"
If the destination queue owner is not correct will get an error in "sys.aq$_schedules"
"ORA-25205: the QUEUE <schema>.<queue_name> does not exist"
Make sure the destination queue names are correct when adding subscriber and scheduling propogation.
If destination queue does not exist or is not started you will get an error message in
"DBA_QUEUE_SCHEDULES" -> LAST_ERROR_MSG
"ORA-25205: the QUEUE <schema>.<queue_name> does not exist".
If local queue_name is invalid you will get "ERROR : Queue does not exits" when adding the subscriber.
ORA-24010: QUEUE AQDEMO_QUEUE3 does not exist
ORA-06512: at "SYS.DBMS_SYS_ERROR", line 86
ORA-06512: at "SYS.DBMS_AQADM_SYS", line 6228
ORA-06512: at "SYS.DBMS_AQADM", line 364
Make sure job queue processes is set to a sufficient value the queue messages to get propogated.
In setting the number of JOB_QUEUE_PROCESSES, DBAs should be aware that this number is determined by the number of queues from which the messages have to be propagated and the number of destinations (rather than queues) to which messages have to be propagated.
Refer to https://docs.oracle.com/cd/A97630_01/appdev.920/a96587/qmanage.htm#65558 to determine this value
Quote for the day
“You Are Never To Old To Set Another Goal Or To Dream A New Dream.”- C.S. Lewis
No comments:
Post a Comment