Navigation Bar

Sunday, August 28, 2016

Oracle Advanced Queues : Propogation of data between two databases

For propogating data between two databases you have to add a subscriber to the queue. In the subscriber agent details you will provide the details of the remote database and the queue on the remote database on which the messages have to be dequeued.

On the remote database, the subscriber to the remote queue will be the subscriber on the local database.

The dequeue script that is created is then registered to listen for messages on the remote queue for dequeueing.

Below are the detailed set of scripts for an advanced queue setup. This has been tested on a 11g environment. 11.2.0.1.

Create the queue admin users on both the local and remote databases.

CONNECT sys/change_on_install as sysdba


DROP USER aq_admin CASCADE;

CREATE USER aq_admin IDENTIFIED BY aq_admin
  DEFAULT TABLESPACE users
  TEMPORARY TABLESPACE temp;

ALTER USER aq_admin QUOTA UNLIMITED ON users;

GRANT aq_administrator_role TO aq_admin;
GRANT connect               TO aq_admin;
GRANT create type           TO aq_admin;
GRANT create sequence       TO aq_admin;
GRANT CREATE JOB TO aq_admin;
GRANT CREATE PROCEDURE TO aq_admin;
GRANT CREATE TABLE TO aq_admin;
GRANT EXECUTE ON DBMS_AQ TO aq_admin;

EXECUTE dbms_aqadm.grant_type_access('aq_admin');

Additional Grant on the local database
GRANT CREATE DATABASE LINK TO AQ_ADMIN;

Steps on On the local database


Connect as aq_admin user and create the database link between local and remote databases and make sure the link is working. Test it by accessing a table from the remote database.

CREATE DATABASE LINK ln_uat_dev CONNECT TO aq_admin identified by aq_admin using '11gdev';

--Test the kink
Select * from T1@ln_uat_dev; -- This should give an output.

-- Create the type for the payload. It must have the same structure that the one created
--    in the other database. Be careful with the character sets of both databases.
create or replace type test_type as object (message CLOB);
/

begin
  -- Create a table for queues of the type defined before.
  dbms_aqadm.create_queue_table (queue_table => 'aqdemo_queue1_t', 
                                 queue_payload_type => 'aq_admin.TEST_TYPE', 
                                 multiple_consumers => true);
  -- Create the test queue, which will receive the messages to be propagated.
  dbms_aqadm.create_queue (queue_name => 'aqdemo_queue1', 
                           queue_table => 'aqdemo_queue1_t');
  -- Start the queue for enqueuing and dequeuing messages.
  dbms_aqadm.start_queue (queue_name => 'aqdemo_queue1');

  -- Add the remote subscriber.
  dbms_aqadm.add_subscriber (queue_name => 'aqdemo_queue1', 
                             subscriber => sys.aq$_agent(name => 'aqdemo_queue1_subscriber', 
                                                         address => 'aq_admin.aqdemo_queue2@ln_uat_dev', 
                                                         protocol => 0 ), 
                             queue_to_queue => true);

  -- Start the propagation of messages.
  dbms_aqadm.schedule_propagation(queue_name => 'aq_admin.aqdemo_queue1', 
                                  latency => 0, 
                                  destination => 'ln_uat_dev', 
                                  destination_queue => 'aq_admin.aqdemo_queue2');
end;
/

Steps on the remote database

-- Create the type for the payload. It must have the same structure that the one created
--    in the other database. Be careful with the character sets of both databases.
create or replace type test_type as object (message CLOB);
/

begin
  -- Create a table for queues of the type defined before.
  dbms_aqadm.create_queue_table (queue_table => 'aqdemo_queue2_t', 
                                 queue_payload_type => 'aq_admin.TEST_TYPE', 
                                 multiple_consumers => true );
  -- Create the test queue, which will receive the messages from the queue on the LOCAL database.
  dbms_aqadm.create_queue (queue_name => 'aqdemo_queue2', 
                           queue_table => 'aqdemo_queue2_t');
  -- Start the queue for enqueuing and dequeuing messages.
  dbms_aqadm.start_queue (queue_name => 'aqdemo_queue2');
end;
/
-- Create a table to store the messages received.
create table aqdemo_queue2_message_t
  (received timestamp default systimestamp,
   message CLOB);
-- Create a callback procedure that dequeues the received message and saves it
create or replace
procedure aqdemo_queue2_dequeue
  (
    context raw,
    reginfo sys.aq$_reg_info,
    descr sys.aq$_descriptor,
    payload raw,
    payloadl number
  )
as
  r_dequeue_options dbms_aq.dequeue_options_t;
  r_message_properties dbms_aq.message_properties_t;
  v_message_handle raw(26);
  o_payload test_type;
begin
  r_dequeue_options.msgid         := descr.msg_id;
  r_dequeue_options.consumer_name := descr.consumer_name;
  dbms_aq.dequeue(queue_name => descr.queue_name, 
                  dequeue_options => r_dequeue_options, 
                  message_properties => r_message_properties, 
                  payload => o_payload, 
                  msgid => v_message_handle);
  insert into aqdemo_queue2_message_t 
    (message) 
    values (o_payload.message);
  commit;
exception
  when others then
    rollback;
end;
/

begin
  -- Register the procedure for dequeuing the messages received.
  -- I'd like to point out that the subscriber is the one defined for the local database
  DBMS_AQADM.ADD_SUBSCRIBER (
      queue_name => 'aq_admin.aqdemo_queue2',
      subscriber => SYS.AQ$_AGENT(
                       'aqdemo_queue1_subscriber',
                       NULL,
                       NULL )
      );
 
  dbms_aq.register (sys.aq$_reg_info_list(sys.aq$_reg_info('aq_admin.aqdemo_queue2:aqdemo_queue1_subscriber', 
                                                           dbms_aq.namespace_aq, 'plsql://aq_admin.aqdemo_queue2_dequeue', 
                                                           hextoraw('FF'))), 
                    1);
end;
/

Sample script to test enqueueing of messages

declare
  enq_msgid raw(16);
  eopt dbms_aq.enqueue_options_t;
  mprop dbms_aq.message_properties_t;
begin
  mprop.priority := 1;
--  mprop.EXPIRATION := 3;
  for i in 1..100
  loop
    dbms_aq.enqueue(queue_name => 'aqdemo_queue1', 
                    enqueue_options => eopt, 
                    message_properties => mprop, 
                    payload => test_type('This is a remote queue test message #: ' || 
                                         lpad(i, 3, 0)), 
                    msgid => enq_msgid);
  end loop;
  commit;
end;

To check messages dequeued

select * from aqdemo_queue2_message_t order by received;


Cleanup scriptsFor local queue objects

execute dbms_aqadm.stop_queue(queue_name => 'aqdemo_queue1');
execute dbms_aqadm.drop_queue(queue_name => 'aqdemo_queue1');
execute dbms_aqadm.drop_queue_table(queue_table => 'aqdemo_queue1_t');

drop type test_type;

For remote queue objects

drop procedure aqdemo_queue2_dequeue;
execute dbms_aqadm.stop_queue(queue_name => 'aqdemo_queue2');
execute dbms_aqadm.drop_queue(queue_name => 'aqdemo_queue2');
execute dbms_aqadm.drop_queue_table(queue_table => 'aqdemo_queue2_t');
drop table aqdemo_queue2_message_t;
drop type test_type;


References
http://fcosfc.wordpress.com

No comments:

Post a Comment