For propogating data between two databases you have to add a subscriber to the queue. In the subscriber agent details you will provide the details of the remote database and the queue on the remote database on which the messages have to be dequeued.
On the remote database, the subscriber to the remote queue will be the subscriber on the local database.
The dequeue script that is created is then registered to listen for messages on the remote queue for dequeueing.
Below are the detailed set of scripts for an advanced queue setup. This has been tested on a 11g environment. 11.2.0.1.
DROP USER aq_admin CASCADE;
CREATE USER aq_admin IDENTIFIED BY aq_admin
DEFAULT TABLESPACE users
TEMPORARY TABLESPACE temp;
ALTER USER aq_admin QUOTA UNLIMITED ON users;
GRANT aq_administrator_role TO aq_admin;
GRANT connect TO aq_admin;
GRANT create type TO aq_admin;
GRANT create sequence TO aq_admin;
GRANT CREATE JOB TO aq_admin;
GRANT CREATE PROCEDURE TO aq_admin;
GRANT CREATE TABLE TO aq_admin;
On the remote database, the subscriber to the remote queue will be the subscriber on the local database.
The dequeue script that is created is then registered to listen for messages on the remote queue for dequeueing.
Below are the detailed set of scripts for an advanced queue setup. This has been tested on a 11g environment. 11.2.0.1.
Create the queue admin users on both the local and remote databases.
CONNECT sys/change_on_install as sysdbaDROP USER aq_admin CASCADE;
CREATE USER aq_admin IDENTIFIED BY aq_admin
DEFAULT TABLESPACE users
TEMPORARY TABLESPACE temp;
ALTER USER aq_admin QUOTA UNLIMITED ON users;
GRANT aq_administrator_role TO aq_admin;
GRANT connect TO aq_admin;
GRANT create type TO aq_admin;
GRANT create sequence TO aq_admin;
GRANT CREATE JOB TO aq_admin;
GRANT CREATE PROCEDURE TO aq_admin;
GRANT CREATE TABLE TO aq_admin;
GRANT EXECUTE ON DBMS_AQ TO aq_admin;
EXECUTE dbms_aqadm.grant_type_access('aq_admin');
EXECUTE dbms_aqadm.grant_type_access('aq_admin');
Additional Grant on the local database
GRANT CREATE DATABASE LINK TO AQ_ADMIN;
Connect as aq_admin user and create the database link between local and remote databases and make sure the link is working. Test it by accessing a table from the remote database.
CREATE DATABASE LINK ln_uat_dev CONNECT TO aq_admin identified by aq_admin using '11gdev';
--Test the kink
Select * from T1@ln_uat_dev; -- This should give an output.
Steps on On the local database
Connect as aq_admin user and create the database link between local and remote databases and make sure the link is working. Test it by accessing a table from the remote database.
CREATE DATABASE LINK ln_uat_dev CONNECT TO aq_admin identified by aq_admin using '11gdev';
--Test the kink
Select * from T1@ln_uat_dev; -- This should give an output.
-- Create the type for the payload. It must have the same structure that the one created
-- in the other database. Be careful with the character sets of both databases.
create or replace type test_type as object (message CLOB);
/
begin
-- Create a table for queues of the type defined before.
dbms_aqadm.create_queue_table (queue_table => 'aqdemo_queue1_t',
queue_payload_type => 'aq_admin.TEST_TYPE',
multiple_consumers => true);
-- Create the test queue, which will receive the messages to be propagated.
dbms_aqadm.create_queue (queue_name => 'aqdemo_queue1',
queue_table => 'aqdemo_queue1_t');
-- Start the queue for enqueuing and dequeuing messages.
dbms_aqadm.start_queue (queue_name => 'aqdemo_queue1');
-- Add the remote subscriber.
dbms_aqadm.add_subscriber (queue_name => 'aqdemo_queue1',
subscriber => sys.aq$_agent(name => 'aqdemo_queue1_subscriber',
address => 'aq_admin.aqdemo_queue2@ln_uat_dev',
protocol => 0 ),
queue_to_queue => true);
-- Start the propagation of messages.
dbms_aqadm.schedule_propagation(queue_name => 'aq_admin.aqdemo_queue1',
latency => 0,
destination => 'ln_uat_dev',
destination_queue => 'aq_admin.aqdemo_queue2');
end;
/
Steps on the remote database
-- Create the type for the payload. It must have the same structure that the one created
-- in the other database. Be careful with the character sets of both databases.
create or replace type test_type as object (message CLOB);
/
begin
-- Create a table for queues of the type defined before.
dbms_aqadm.create_queue_table (queue_table => 'aqdemo_queue2_t',
queue_payload_type => 'aq_admin.TEST_TYPE',
multiple_consumers => true );
-- Create the test queue, which will receive the messages from the queue on the LOCAL database.
dbms_aqadm.create_queue (queue_name => 'aqdemo_queue2',
queue_table => 'aqdemo_queue2_t');
-- Start the queue for enqueuing and dequeuing messages.
dbms_aqadm.start_queue (queue_name => 'aqdemo_queue2');
end;
/
-- Create a table to store the messages received.
create table aqdemo_queue2_message_t
(received timestamp default systimestamp,
message CLOB);
-- Create a callback procedure that dequeues the received message and saves it
create or replace
procedure aqdemo_queue2_dequeue
(
context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payloadl number
)
as
r_dequeue_options dbms_aq.dequeue_options_t;
r_message_properties dbms_aq.message_properties_t;
v_message_handle raw(26);
o_payload test_type;
begin
r_dequeue_options.msgid := descr.msg_id;
r_dequeue_options.consumer_name := descr.consumer_name;
dbms_aq.dequeue(queue_name => descr.queue_name,
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle);
insert into aqdemo_queue2_message_t
(message)
values (o_payload.message);
commit;
exception
when others then
rollback;
end;
/
begin
-- Register the procedure for dequeuing the messages received.
-- I'd like to point out that the subscriber is the one defined for the local database
DBMS_AQADM.ADD_SUBSCRIBER (
queue_name => 'aq_admin.aqdemo_queue2',
subscriber => SYS.AQ$_AGENT(
'aqdemo_queue1_subscriber',
NULL,
NULL )
);
dbms_aq.register (sys.aq$_reg_info_list(sys.aq$_reg_info('aq_admin.aqdemo_queue2:aqdemo_queue1_subscriber',
dbms_aq.namespace_aq, 'plsql://aq_admin.aqdemo_queue2_dequeue',
hextoraw('FF'))),
1);
end;
/
Sample script to test enqueueing of messages
declare
enq_msgid raw(16);
eopt dbms_aq.enqueue_options_t;
mprop dbms_aq.message_properties_t;
begin
mprop.priority := 1;
-- mprop.EXPIRATION := 3;
for i in 1..100
loop
dbms_aq.enqueue(queue_name => 'aqdemo_queue1',
enqueue_options => eopt,
message_properties => mprop,
payload => test_type('This is a remote queue test message #: ' ||
lpad(i, 3, 0)),
msgid => enq_msgid);
end loop;
commit;
end;
To check messages dequeued
select * from aqdemo_queue2_message_t order by received;
Cleanup scriptsFor local queue objects
execute dbms_aqadm.stop_queue(queue_name => 'aqdemo_queue1');
execute dbms_aqadm.drop_queue(queue_name => 'aqdemo_queue1');
execute dbms_aqadm.drop_queue_table(queue_table => 'aqdemo_queue1_t');
drop type test_type;
For remote queue objects
drop procedure aqdemo_queue2_dequeue;
execute dbms_aqadm.stop_queue(queue_name => 'aqdemo_queue2');
execute dbms_aqadm.drop_queue(queue_name => 'aqdemo_queue2');
execute dbms_aqadm.drop_queue_table(queue_table => 'aqdemo_queue2_t');
drop table aqdemo_queue2_message_t;
drop type test_type;
References
http://fcosfc.wordpress.com
No comments:
Post a Comment