Oracle Advanced Queuing (AQ) demo
Advanced Queuing (AQ) has been available for several versions of Oracle.
It provides database-integrated message queuing functionality, which is built on top of Oracle Streams and leverages the functions of Oracle Database so that messages can be stored persistently, propagated between queues on different computers and databases, and transmitted using Oracle Net Services and HTTP(S). (For more information, refer to oracle documentation
The following shows an example of the use of Oracle Database Advanced Queuing for implementing a publish/subscribe relationship between publisher Application A 1and subscriber App.C1, App.C2, and App.C4:
This article will show how to schedule queue propagation from an oracle database18c to an oracle database12c using database link.
It presents all the steps to follow to ensure that the demo runs successfully. We have a target environment on which Oracle Database 12c Enterprise Edition Release 12.2.0.1.0 – 64bit Production is installed and a source environment on which Oracle Database 18c Enterprise Edition Release 18.0.0.0.0 – Production Version 18.3.0.0.0 is installed.
All what we need to run this demo, is a configured pluggable database in both source and target environment.
• We have two remote subscribers Irina and Nastasia in an oracle database 12c environment and AQ_DEV user in both source and target environment.
• The subscriber Nastasia is added by specifying a transformation on the payload, while no transaformation is specified for Irina.
• A same list of messages containing salaries amount in Canadian dollars is enqueued to both Irina and Nastasia subscribers.
• When dequeening the messages and specifying Nastasia subscriber, all salaries amounts are converted in Euro by a transformation.
- step 1 : Create a database link.
- Condition 1: Connect as sysdba in the source environment.
- Code Listing 1:
- <oracle_home>\bin>sqlplus / as sysdba;
SQL*Plus: Release 18.0.0.0.0 - Production on Sat Jun 8 13:33:33 2019 Version 18.3.0.0.0 Copyright (c) 1982, 2018, Oracle. All rights reserved. Connected to: Oracle Database 18c Enterprise Edition Release 18.0.0.0.0 - Production Version 18.3.0.0.0
- Condition 1: Verify the status of the PDB in the source environment.
- Code Listing 2:
COLUMN name FORMAT a20; COLUMN open_mode FORMAT a20; SELECT name, open_mode FROM v$pdbs where name = 'ORAPLG' / NAME OPEN_MODE -------------------- -------------------- ORAPLG MOUNTED
- Condition 1: Open the ORAPLG pdb in read/write mode.
- Code Listing 3:
alter pluggable database oraplg open read write
/
Pluggable database altered.
select name, open_mode
from V$PDBS
where name = 'ORAPLG'
/
NAME OPEN_MODE
-------------------- --------------------
ORAPLG READ WRITE
- Condition 4: Verify on which database we are connected to, pluggable or container database.
- Code Listing 4:
COLUMN user FORMAT a10; COLUMN con_name FORMAT a10; SELECT user, sys_context('USERENV', 'con_name') AS con_name FROM dual / USER CON_NAME ---------- ---------- SYS CDB$ROOT
- Condition 5: Connect to PDB instead of CDB.
- Code Listing 5:
alter session set container=ORAPLG / Session altered. SELECT user, sys_context('USERENV', 'con_name') AS con_name FROM dual / USER CON_NAME ---------- ---------- SYS ORAPLG
- Code Listing 6: Create public database link named LINK_TO_HR12
CREATE PUBLIC DATABASE LINK LINK_TO_HR12 CONNECT TO "SYSTEM" IDENTIFIED BY PassXxXx USING '(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = linecode)(PORT = 1522)) (CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = ORA12PLG) ) )' / Database link created.
- step 2 : Set up the target environment.
- Code Listing 7:
<oracle_home>\bin>sqlplus / as sysdba;
SQL*Plus: Release 12.2.0.1.0 Production on Sat Jun 8 14:06:29 2019
Copyright (c) 1982, 2016, Oracle. All rights reserved.
Connected to:
Oracle Database 12c Enterprise Edition Release 12.2.0.1.0 - 64bit Production
- Condition 2: Verify the status of the ORA12PLG PDB.
- Code Listing 8:
COLUMN name FORMAT a20;
COLUMN open_mode FORMAT a20;
select name, open_mode
from V$PDBS
where name = 'ORA12PLG'
/
NAME OPEN_MODE
-------------------- --------------------
ORA12PLG MOUNTED
SQL>
- Condition 3: Open ORA12PLG pdb in read/write mode.
- Code Listing 9:
alter pluggable database ora12plg open; Pluggable database altered. select name, open_mode from V$PDBS where name = 'ORA12PLG' / NAME OPEN_MODE -------------------- -------------------- ORA12PLG READ WRITE
- Condition 4: Verify on which database we are connected to, pluggable or container database.
- Code Listing 10:
column user format A20; column con_name format A20; SELECT user, sys_context('USERENV', 'con_name') AS con_name FROM dual / USER CON_NAME -------------------- -------------------- SYS CDB$ROOT
- Condition 5: Connect to the PDB instead of the CDB.
- Code Listing 11:
alter session set container=ORA12PLG / Session altered. SELECT user, sys_context('USERENV', 'con_name') AS con_name FROM dual / USER CON_NAME -------------------- -------------------- SYS ORA12PLG
- Code Listing 12: Provide a name for the tablespace, ex. aq12c_tbs.
ACCEPT tbs12c_name CHAR PROMPT 'Enter the tablespace''s name in ora12plg : '; Enter the tablespace's name in ora12plg : aq12c_tbs
- Code Listing 13: Create aq12c_tbs tablespace.
set serveroutput on; set verify off; DECLARE vcount_adm_tbs INTEGER := 0; v_plsql VARCHAR2(500); v_tbs_path VARCHAR2(100); v_tbs_name VARCHAR2(20) := upper('&tbs12c_name'); BEGIN SELECT COUNT(1) INTO vcount_adm_tbs FROM v$tablespace WHERE name = upper(v_tbs_name); IF vcount_adm_tbs != 0 THEN EXECUTE IMMEDIATE ( 'drop tablespace ' || upper(v_tbs_name) || ' including contents and datafiles cascade constraints' ); END IF; SELECT regexp_replace(ddf.file_name, '.[[:alnum:]]+', '', instr(ddf.file_name, '\', - 1)) INTO v_tbs_path FROM dba_data_files ddf WHERE ROWNUM < 2; v_plsql := 'create tablespace ' || v_tbs_name || ' datafile ''' || v_tbs_path || '\' || v_tbs_name || '.DBF'' SIZE 5M REUSE EXTENT MANAGEMENT LOCAL SEGMENT SPACE MANAGEMENT AUTO'; EXECUTE IMMEDIATE v_plsql; dbms_output.put_line('Tablespace '||v_tbs_name|| ' successfully created!.') ; EXCEPTION WHEN OTHERS THEN dbms_output.put_line('Error while dropping/creating tablespace ( '||v_tbs_name||' )' || sqlerrm); END; / Tablespace AQ12C_TBS successfully created!. PL/SQL procedure successfully completed.
- step 2.2.2: Drop AQ_DEV user including its database objects if it already exist.
- Code Listing 13: Drop AQ_DEV user if already exist.
set serveroutput on;
DECLARE
vcount_aq_dev INTEGER := 0;
BEGIN
SELECT COUNT(1)
INTO vcount_aq_dev
FROM dba_users
WHERE username = ‘AQ_DEV’;
IF vcount_aq_dev != 0 THEN
EXECUTE IMMEDIATE ( ‘DROP USER AQ_DEV CASCADE’ );
dbms_output.put_line(‘AQ_DEV user dropped’);
ELSE
dbms_output.put_line(‘AQ_DEV user not found’);
END IF;
EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line(‘Error while dropping user : ‘ || sqlerrm);
END;
/AQ_DEV user dropped
PL/SQL procedure successfully completed.
- step 2.2.3: Create AQ_DEV user with the necessary privilleges in ora12plg PDB.
- Code Listing 14: Provide password for AQ_DEV user.
ACCEPT pass_aq_dev CHAR PROMPT 'Enter the password for AQ_DEV user : ' HIDE; Enter the password for aq_dev :
- Code Listing 15: Create AQ_DEV user with the following pivileges.
CREATE USER aq_dev IDENTIFIED BY &pass_aq_dev DEFAULT TABLESPACE &tbs12c_name; GRANT dba, CREATE ANY TYPE TO aq_dev; GRANT CREATE ANY TYPE TO aq_dev; GRANT aq_administrator_role TO aq_dev; GRANT CREATE SESSION TO aq_dev; GRANT connect TO aq_dev; GRANT CREATE TYPE TO aq_dev; GRANT EXECUTE ON dbms_aq TO aq_dev / SQL> CREATE USER aq_dev IDENTIFIED BY &pass_aq_dev 2 DEFAULT TABLESPACE &tbs12c_name; User created. SQL> GRANT dba, CREATE ANY TYPE TO aq_dev; Grant succeeded. SQL> GRANT CREATE ANY TYPE TO aq_dev; Grant succeeded. SQL> GRANT aq_administrator_role TO aq_dev; Grant succeeded. SQL> GRANT CREATE SESSION TO aq_dev; Grant succeeded. SQL> GRANT connect TO aq_dev; Grant succeeded. SQL> GRANT CREATE TYPE TO aq_dev; Grant succeeded. SQL> GRANT EXECUTE ON dbms_aq TO aq_dev 2 / Grant succeeded. SQL>
- step 2.2.4: Connect as AQ_DEV User to ORA12PLG PDB and set up administratives AQ types.
- Code Listing 15: Connect as AQ_DEV user.
connect AQ_DEV/&pass_aq_dev@linecode:1522/ora12plg; Connected.
- Code Listing 16: Verify AQ_DEV user connexion
SELECT user, sys_context('USERENV', 'con_name') AS con_name FROM dual / USER CON_NAME -------------------- -------------------- AQ_DEV ORA12PLG
- step 2.2.5: Create a message type. A message type must be created before queue table and table.
- Code Listing 17: create a message type.
CREATE OR REPLACE TYPE msg_typ AS OBJECT ( sender_id NUMBER, subject VARCHAR2(30), text VARCHAR2(1000), salary NUMBER ) / Type created.
- step 2.2.6: Create a multiconsumer queue table and a queue.
- Code Listing 18: Create a multiconsumer queue table and a queue
SET SERVEROUTPUT ON; BEGIN -- create a queue table dbms_aqadm.create_queue_table(queue_table => 'multi_msgs12_csumers_qtab', multiple_consumers => true, queue_payload_type => 'msg_typ' ); -- create a queue dbms_aqadm.create_queue(queue_name => 'msg12_queue_multiple', queue_table => 'multi_msgs12_csumers_qtab' ); EXCEPTION WHEN OTHERS THEN dbms_output.put_line('Error while creating queue/queue table objects: ' || sqlerrm); END; / PL/SQL procedure successfully completed.
- Code Listing 19: Verify that the queue is created.
SET LINESIZE 300; COLUMN name FORMAT a50; COLUMN queue_table FORMAT a50; COLUMN queue_type FORMAT a20; SELECT name, queue_table, queue_type FROM user_queues / NAME QUEUE_TABLE QUEUE_TYPE -------------------------------------- -------------------------------------------------- ----------- AQ$_MULTI_MSGS12_CSUMERS_QTAB_E MULTI_MSGS12_CSUMERS_QTAB EXCEPTION_QUEUE MSG12_QUEUE_MULTIPLE MULTI_MSGS12_CSUMERS_QTAB NORMAL_QUEUE
- Code Listing 20: verify that the queue table is created.
COLUMN queue_table FORMAT a50;
COLUMN object_type FORMAT a50;
COLUMN TYPE FORMAT a7;
SELECT
queue_table,
object_type,
type
FROM
user_queue_tables
/
QUEUE_TABLE OBJECT_TYPE TYPE
-------------------------------------------------- -------------------------------------------------- ------
MULTI_MSGS12_CSUMERS_QTAB AQ_DEV.MSG_TYP OBJECT
SQL>
- step 2.2.7: Start the queue
- Code Listing 21: Start the queue
BEGIN dbms_aqadm.start_queue(queue_name => 'msg12_queue_multiple'); EXCEPTION WHEN OTHERS THEN dbms_output.put_line('Error while starting the queue: ' || sqlerrm); END; / SQL> BEGIN 2 dbms_aqadm.start_queue(queue_name => 'msg12_queue_multiple'); 3 EXCEPTION 4 WHEN OTHERS THEN 5 dbms_output.put_line('Error while starting the queue: ' || sqlerrm); 6 END; 7 / PL/SQL procedure successfully completed. SQL>
- Condition 1: Connect as sysdba in the source environment.
- Execute the statement sqlplus / as sysdba in a command-line from <oracle_home>\bin>, where <oracle_home> is an environment variable set in your windows regitsry.
- Code Listing 22:. Connect as sysdba in the target environment
<oracle_home>\bin>sqlplus / as sysdba; SQL*Plus: Release 18.0.0.0.0 - Production on Tue Jul 30 22:57:29 2019 Version 18.3.0.0.0 Copyright (c) 1982, 2018, Oracle. All rights reserved. Connected to: Oracle Database 18c Enterprise Edition Release 18.0.0.0.0 - Production Version 18.3.0.0.0
- Condition 3: Verify the status of the PDB.
- Code Listing 23:
COLUMN name FORMAT a20; COLUMN open_mode FORMAT a20; select name, open_mode from V$PDBS where name = 'ORAPLG' / NAME OPEN_MODE -------------------- -------------------- ORAPLG MOUNTED
- Condition 4: Open ORAPLG pdb in read/write mode.
- Code Listing 24:
alter pluggable database oraplg open / Pluggable database altered. select name, open_mode from V$PDBS where name = 'ORAPLG' / NAME OPEN_MODE -------------------- -------------------- ORAPLG READ WRITE
- Condition 5: Verify on which database we are connected to, pluggable or container database.
- Code Listing 25:
column user format A20; column con_name format A20; SELECT user, sys_context('USERENV', 'con_name') AS con_name FROM dual / USER CON_NAME -------------------- -------------------- SYS CDB$ROOT
- Condition 5: Connect to the PDB instead of the CDB
- Code Listing 26:
alter session set container=ORAPLG / Session altered. SELECT user, sys_context('USERENV', 'con_name') AS con_name FROM dual / USER CON_NAME -------------------- -------------------- SYS ORAPLG
- Step 3.1 : Create a specific tablespace for user aq_dev’s objects.
- Code Listing 27: Provide a name for the tablespace, ex. aq18c_tbs.
ACCEPT tbs18c_name CHAR PROMPT 'Enter the tablespace''s name in oraplg : '; Enter the tablespace's name in oraplg : aq18c_tbs
- Code Listing 28: Create aq18c_tbs tablespace.
set serveroutput on; set verify off; DECLARE vcount_adm_tbs INTEGER := 0; v_plsql VARCHAR2(500); v_tbs_path VARCHAR2(100); v_tbs_name VARCHAR2(20) := upper('&tbs18c_name'); BEGIN SELECT COUNT(1) INTO vcount_adm_tbs FROM v$tablespace WHERE name = upper(v_tbs_name); IF vcount_adm_tbs != 0 THEN EXECUTE IMMEDIATE ( 'drop tablespace ' || upper(v_tbs_name) || ' including contents and datafiles cascade constraints' ); END IF; SELECT regexp_replace(ddf.file_name, '.[[:alnum:]]+', '', instr(ddf.file_name, '\', - 1)) INTO v_tbs_path FROM dba_data_files ddf WHERE ROWNUM < 2; v_plsql := 'create tablespace ' || v_tbs_name || ' datafile ''' || v_tbs_path || '\' || v_tbs_name || '.DBF'' SIZE 5M REUSE EXTENT MANAGEMENT LOCAL SEGMENT SPACE MANAGEMENT AUTO'; EXECUTE IMMEDIATE v_plsql; dbms_output.put_line('Tablespace '||v_tbs_name|| ' successfully created!.') ; EXCEPTION WHEN OTHERS THEN dbms_output.put_line('Error while dropping/creating tablespace ( '||v_tbs_name||' )' || sqlerrm); END; / Tablespace AQ18C_TBS successfully created!. PL/SQL procedure successfully completed.
- Step 3.2 : Drop AQ_DEV user including its database objects if it already exist.
- Code Listing 29: Drop AQ_DEV if it already exist.
DECLARE
vcount_aq_dev INTEGER := 0;
BEGIN
SELECT COUNT(1)
INTO vcount_aq_dev
FROM dba_users
WHERE username = ‘AQ_DEV’;
IF vcount_aq_dev != 0 THEN
EXECUTE IMMEDIATE ( ‘DROP USER AQ_DEV CASCADE’ );
dbms_output.put_line(‘AQ_DEV user dropped’);
ELSE
dbms_output.put_line(‘AQ_DEV user not found’);
END IF;EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line(‘Error while dropping user : ‘ || sqlerrm);
END;
/
AQ_DEV user dropped
PL/SQL procedure successfully completed.
- Step 3.3 : Create AQ_DEV user with the necessary privilleges in oraplg PDB.
- Code Listing 30: Provide password for AQ_DEV user.
ACCEPT pass_aq_dev CHAR PROMPT 'Enter the password for aq_dev : ' HIDE; Enter the password for aq_dev :
- Code Listing 31: Create AQ_DEV user with the following privileges
CREATE USER aq_dev IDENTIFIED BY &pass_aq_dev DEFAULT TABLESPACE &tbs18c_name; GRANT dba, CREATE ANY TYPE TO aq_dev; GRANT aq_administrator_role TO aq_dev; GRANT CONNECT TO aq_dev; GRANT CREATE TYPE TO aq_dev; GRANT EXECUTE ON dbms_aq TO aq_dev; GRANT SELECT ON hr.employees TO aq_dev; GRANT CREATE SEQUENCE TO aq_dev / SQL> CREATE USER aq_dev IDENTIFIED BY &pass_aq_dev 2 DEFAULT TABLESPACE &tbs18c_name; User created. SQL> GRANT dba, CREATE ANY TYPE TO aq_dev; Grant succeeded. SQL> GRANT aq_administrator_role TO aq_dev; Grant succeeded. SQL> GRANT CONNECT TO aq_dev; Grant succeeded. SQL> GRANT CREATE TYPE TO aq_dev; Grant succeeded. SQL> GRANT EXECUTE ON dbms_aq TO aq_dev; Grant succeeded. SQL> GRANT SELECT ON hr.employees TO aq_dev; Grant succeeded. SQL> GRANT CREATE SEQUENCE TO aq_dev 2 / Grant succeeded.
- Step 3.4 : Connect as AQ_DEV User to ORAPLG PDB and set up administratives AQ types.
- Code Listing 32: Connect as AQ_DEV.
connect AQ_DEV/&pass_aq_dev@linecode:1521/oraplg; Connected.
- Code Listing 33: Verify AQ_DEV user connexion.
SELECT user, sys_context('USERENV', 'con_name') AS con_name FROM dual / USER CON_NAME -------------------- -------------------- AQ_DEV ORAPLG SQL>
- Step 3.5 : Test link_to_hr12 database link.
- Code Listing 34: Test link_to_hr12 database link. Display the employee with employee_id of 999
SET LINESIZE 250; COLUMN employee_id FORMAT 999999; COLUMN first_name FORMAT a10; COLUMN last_name FORMAT a10; COLUMN email FORMAT a25; COLUMN phone_number FORMAT a20; COLUMN job_id FORMAT a10; COLUMN salary FORMAT 999999.99; COLUMN commission_pct FORMAT 99.99; COLUMN manager_id FORMAT 999999; SELECT employee_id AS emp_id, first_name AS f_name, last_name AS l_name, email AS email, phone_number AS phone, job_id, salary, nvl(commission_pct,0) AS com_pct, manager_id AS mgr_id FROM hr.employees@link_to_hr12 WHERE employee_id = 999 / EMP_ID F_NAME L_NAME EMAIL PHONE JOB_ID SALARY COM_PCT MGR_ID ---------- -------------------- ------------------------- ------------------------- -------------------- --- 999 Salem Houali salemhouali 5147949706 AC_MGR 8500.00 0 101 SQL>
- Step 3.6 : Create seq_adm sequence.
- Code Listing 35: Create seq_adm sequence.
DECLARE vcount_seq_adm INTEGER := 0; BEGIN SELECT COUNT(1) INTO vcount_seq_adm FROM dba_sequences WHERE sequence_name LIKE 'SEQ_AD%' AND sequence_owner = 'AQ_DEV'; IF vcount_seq_adm <> 0 THEN EXECUTE IMMEDIATE ( 'drop sequence SEQ_ADM' ); EXECUTE IMMEDIATE ( 'create sequence SEQ_ADM start with 1 increment by 1 NOCACHE NOCYCLE' ); ELSE EXECUTE IMMEDIATE ( 'create sequence SEQ_ADM start with 1 increment by 1 NOCACHE NOCYCLE' ); END IF; dbms_output.put_line('SEQ_ADM Sequence Created!'); EXCEPTION WHEN OTHERS THEN dbms_output.put_line('Error while creating seq_adm sequence : ' || sqlerrm); END; / PL/SQL procedure successfully completed.
- Step 3.7 : Create a message type. A message type must be created before queue table and table.
- Code Listing 36: Create a message type
CREATE OR REPLACE TYPE msg_typ AS OBJECT ( sender_id NUMBER, subject VARCHAR2(30), text VARCHAR2(1000), salary NUMBER ) / Type created.
- Step 3.8: Create a multiconsumer queue table and a queue.
- Code Listing 37: Create queue and queue table.
BEGIN -- Queue table creation dbms_aqadm.create_queue_table( queue_table => 'multi_msgs18_csumers_qtab', multiple_consumers => true, queue_payload_type => 'msg_typ', comment => 'Queue table in oraplg pluggable18c database' ); -- Queue creation dbms_aqadm.create_queue( queue_name => 'msg18_queue_multiple', queue_table => 'multi_msgs18_csumers_qtab', comment => 'Queue table in oraplg pluggable18c database' ); EXCEPTION WHEN OTHERS THEN dbms_output.put_line('Error while creating queue table / queue : ' || sqlerrm); END; / PL/SQL procedure successfully completed.
- Code Listing 38: Verify that the queue is created.
COLUMN name FORMAT a50; COLUMN queue_table FORMAT a50; COLUMN queue_type FORMAT a20; SELECT name, queue_table, queue_type FROM user_queues / NAME QUEUE_TABLE QUEUE_TYPE ------------------- -------------------------------------------------- -------------------- AQ$_MULTI_MSGS18_CSUMERS_QTAB_E MULTI_MSGS18_CSUMERS_QTAB EXCEPTION_QUEUE MSG18_QUEUE_MULTIPLE MULTI_MSGS18_CSUMERS_QTAB NORMAL_QUEUE
- Code Listing 39: Verify that the queue table is created.
COLUMN queue_table FORMAT a50; COLUMN object_type FORMAT a50; COLUMN TYPE FORMAT a7; SELECT queue_table, object_type, type FROM user_queue_tables / QUEUE_TABLE OBJECT_TYPE TYPE -------------------------------------------------- -------------------------------------------------- ------- MULTI_MSGS18_CSUMERS_QTAB AQ_DEV.MSG_TYP OBJECT
- Step 3.8: Start the queue
- Code Listing 40: Start the queue.
BEGIN dbms_aqadm.start_queue(queue_name => 'msg18_queue_multiple'); EXCEPTION WHEN OTHERS THEN dbms_output.put_line('Error while starting msg18_queue_multiple queue: ' || sqlerrm); END; / PL/SQL procedure successfully completed.
- Step 3.9: Verify the propagation Queue Type.
- DBMS_AQADM.VERIFY_QUEUE_TYPES verifies that the source and destination queues have identical types.
- Code Listing 41: Verify the propagation Queue Type.
DECLARE rc BINARY_INTEGER; BEGIN dbms_aqadm.verify_queue_types( src_queue_name => 'msg18_queue_multiple', dest_queue_name => 'msg12_queue_multiple', destination => 'LINK_TO_HR12', rc => rc ); dbms_output.put_line('Compatible : '||rc); EXCEPTION WHEN OTHERS THEN dbms_output.put_line('verify_queue_types - error : ' || sqlerrm); END; / VQT: new style queue Compatible : 1 PL/SQL procedure successfully completed.
- Step 3.10: Schedule propagation from source to target environment.
- Code Listing 42: Schedule propagation from source to target environment.
BEGIN dbms_aqadm.schedule_propagation(queue_name => 'msg18_queue_multiple', destination => 'LINK_TO_HR12'); EXCEPTION WHEN OTHERS THEN dbms_output.put_line('error while schudeling propagation : ' || sqlerrm); END; /
- Code Listing 43: Verify if our queue is scheduled in AQ_DEV schema.
COLUMN qname FORMAT a50; COLUMN destination FORMAT a50; COLUMN message_delivery_mode FORMAT a10; COLUMN job_name FORMAT a50; SELECT qname, destination, message_delivery_mode, job_name FROM user_queue_schedules where destination like '%LINK_TO_HR%' / QNAME DESTINATION MESSAGE_DE JOB_NAME -------------------------------------------------- -------------------------------------------------- MSG18_QUEUE_MULTIPLE LINK_TO_HR12 PERSISTENT AQ_JOB$_2092
- Step 3.11: Create a function, can_2_euro_salary for our transformation. The function takes salary value in canadian dollar and converts it to euro.
- Code Listing 44: Create can_2_euro_salary faunction.
CREATE OR REPLACE FUNCTION can_2_euro_salary ( src msg_typ ) RETURN msg_typ AS target msg_typ; BEGIN target := msg_typ(src.sender_id, src.subject, upper(src.text), trunc((src.salary * .68), 2) ); return(target); EXCEPTION WHEN OTHERS THEN dbms_output.put_line('Erreur while creating can_2_euro_salary function : ' || sqlerrm); END can_2_euro_salary; / Function created.
- Step 3.12: Create a message format transformation.
- Code Listing 45: Create a message format transformation.
BEGIN dbms_transform.create_transformation(schema => 'AQ_DEV', name => 'cad$_2_euro', from_schema => 'AQ_DEV', from_type => 'msg_typ', to_schema => 'AQ_DEV', to_type => 'msg_typ', transformation => 'AQ_DEV.can_2_euro_salary(source.user_data)' ); EXCEPTION WHEN OTHERS THEN dbms_output.put_line('Erreur while creating cad$_2_euro - transformation : ' || sqlerrm); END; / PL/SQL procedure successfully completed.
- Code Listing 46: Verify that the transformation is created in AQ_DEV schema.
COLUMN transformation_id FORMAT 9999 COLUMN name FORMAT A20; COLUMN from_type FORMAT A20; COLUMN TO_TYPE FORMAT A20; SELECT transformation_id as trn_id, name, from_type, to_type FROM user_transformations / TRN_ID NAME FROM_TYPE TO_TYPE ---------- -------------------- -------------------- -------------------- 141 CAD$_2_EURO AQ_DEV.MSG_TYP AQ_DEV.MSG_TYP
- Step 3.13: Add subsribers that will dequeue messages in target environment.
- Code Listing 47: Adding subsribers.
DECLARE subscriber sys.aq$_agent; BEGIN subscriber := sys.aq$_agent('Irina', 'AQ_DEV.msg12_queue_multiple@LINK_TO_HR12', NULL); dbms_aqadm.add_subscriber(queue_name => 'msg18_queue_multiple', subscriber => subscriber); -- cad$_2_euro transformation is specified only when adding Nastasia subscriber subscriber := sys.aq$_agent('Nastasia', 'AQ_DEV.msg12_queue_multiple@LINK_TO_HR12', NULL); dbms_aqadm.add_subscriber(queue_name => 'msg18_queue_multiple', subscriber => subscriber, transformation =>'cad$_2_euro' ); EXCEPTION WHEN OTHERS THEN dbms_output.put_line('error while adding subscribers: ' || sqlerrm); END; / PL/SQL procedure successfully completed.
- Code Listing 48: Verify the subsribers.
column QUEUE_NAME format A20; column C_NAME format A10; column ADDRESS format A45; column QUEUE_TO_QUEUE format A15; column TRANSFORMATION format A30; select QUEUE_NAME, CONSUMER_NAME AS C_NAME, TRANSFORMATION, ADDRESS, QUEUE_TO_QUEUE from USER_QUEUE_SUBSCRIBERS / QUEUE_NAME C_NAME TRANSFORMATION ADDRESS QUEUE_TO_QUEUE -------------------- ---------- ------------------------------ --------------------------------------------- ----------- MSG18_QUEUE_MULTIPLE IRINA "AQ_DEV"."MSG12_QUEUE_MULTIPLE"@LINK_TO_HR12 FALSE MSG18_QUEUE_MULTIPLE NASTASIA "AQ_DEV"."CAD$_2_EURO" "AQ_DEV"."MSG12_QUEUE_MULTIPLE"@LINK_TO_HR12 FALSE
- Step 3.14: Enqueue messages.
We use the WIDTH_BUCKET function in order to enqueue some specific group of salaries.
Note that for a given expression, WIDTH_BUCKET returns the bucket number into which the value of this expression would fall after being evaluated. for more information on WIDTH_BUCKET function, please refer to oracle documentation.
- Code Listing 49: List of messages to enqueue for both Irina and Nastasia subscribers.
SET SERVEROUTPUT ON; DECLARE CURSOR c_sal_grp IS SELECT employee_id AS empid, first_name, last_name, salary, width_bucket(salary, 2400, 12500, 10) "SALARY Group" FROM hr.employees WHERE width_bucket(salary, 2400, 12500, 10) BETWEEN 2 AND 6 ORDER BY salary, "SALARY Group" ASC; r_sal_grp c_sal_grp%rowtype; BEGIN dbms_output.put_line('beginning of list of messages to enqueue'); dbms_output.put_line('Emp_Id first_name last_name Salary Group'); dbms_output.put_line('====== ========== ========= ====== ====='); OPEN c_sal_grp; FETCH c_sal_grp INTO r_sal_grp; LOOP EXIT WHEN c_sal_grp%notfound; dbms_output.put_line(rpad(r_sal_grp.empid, 20, ' ') || ' ' || rpad(r_sal_grp.first_name, 20, ' ') || ' ' || rpad(r_sal_grp.last_name, 20, ' ') || ' ' || rpad(TO_CHAR(r_sal_grp.salary, '$9G999G999D99'), 20, ' ') || ' ' || r_sal_grp."SALARY Group"); FETCH c_sal_grp INTO r_sal_grp; END LOOP; CLOSE c_sal_grp; dbms_output.put_line('End of list of messages to enqueue'); EXCEPTION WHEN OTHERS THEN dbms_output.put_line('error : ' || sqlerrm); END; / beginning of list of messages to enqueue Emp_Id first_name last_name Salary Group ====== ========== ========= ====== ===== 141 Trenna Rajs $3,500.00 2 137 Renske Ladwig $3,600.00 2 189 Jennifer Dilly $3,600.00 2 188 Kelly Chung $3,800.00 2 193 Britney Everett $3,900.00 2 192 Sarah Bell $4,000.00 2 185 Alexis Bull $4,100.00 2 107 Diana Lorentz $4,200.00 2 184 Nandita Sarchand $4,200.00 2 200 Jennifer Whalen $4,400.00 2 105 David Austin $4,800.00 3 106 Valli Pataballa $4,800.00 3 124 Kevin Mourgos $5,800.00 4 104 Bruce Ernst $6,000.00 4 202 Pat Fay $6,000.00 4 173 Sundita Kumar $6,100.00 4 179 Charles Johnson $6,200.00 4 167 Amit Banda $6,200.00 4 166 Sundar Ande $6,400.00 4 123 Shanta Vollman $6,500.00 5 203 Susan Mavris $6,500.00 5 165 David Lee $6,800.00 5 113 Luis Popp $6,900.00 5 178 Kimberely Grant $7,000.00 5 155 Oliver Tuvault $7,000.00 5 161 Sarath Sewall $7,000.00 5 164 Mattea Marvins $7,200.00 5 172 Elizabeth Bates $7,300.00 5 171 William Smith $7,400.00 5 154 Nanette Cambrault $7,500.00 6 160 Louise Doran $7,500.00 6 111 Ismael Sciarra $7,700.00 6 112 Jose Manuel Urman $7,800.00 6 122 Payam Kaufling $7,900.00 6 120 Matthew Weiss $8,000.00 6 153 Christopher Olsen $8,000.00 6 159 Lindsey Smith $8,000.00 6 121 Adam Fripp $8,200.00 6 110 John Chen $8,200.00 6 206 William Gietz $8,300.00 6 177 Jack Livingston $8,400.00 6 End of list of messages to enqueue PL/SQL procedure successfully completed.
- Code Listing 50: Enqueue message to subscriber Irina.
SET SERVEROUTPUT ON; DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message msg_typ; recipients dbms_aq.aq$_recipient_list_t; CURSOR c_sal_grp IS SELECT employee_id AS empid, first_name, last_name, salary, width_bucket(salary, 2400, 12500, 10) "SALARY Group" FROM hr.employees WHERE width_bucket(salary, 2400, 12500, 10) BETWEEN 2 AND 6 ORDER BY salary, "SALARY Group" ASC; r_sal_grp c_sal_grp%rowtype; BEGIN dbms_output.put_line('Beginning of the message list enqueuing - Irina subscriber'); OPEN c_sal_grp; FETCH c_sal_grp INTO r_sal_grp; recipients(1) := sys.aq$_agent('Irina', 'AQ_DEV.msg12_queue_multiple@LINK_TO_HR12', NULL); message_properties.recipient_list := recipients; LOOP EXIT WHEN c_sal_grp%notfound; message := msg_typ( seq_adm.nextval, r_sal_grp."SALARY Group", rpad(r_sal_grp.first_name, 20, ' ') || ' ' || r_sal_grp.last_name, trunc(r_sal_grp.salary, 2) ); dbms_aq.enqueue(queue_name => 'msg18_queue_multiple', enqueue_options => enqueue_options, message_properties => message_properties , payload => message, msgid => message_handle ); COMMIT; FETCH c_sal_grp INTO r_sal_grp; END LOOP; dbms_output.put_line('Total messages enqueued - Irina subscriber => ' || c_sal_grp%rowcount); CLOSE c_sal_grp; dbms_output.put_line('End of the message list enqueuing - Irina subscriber'); EXCEPTION WHEN OTHERS THEN dbms_output.put_line('error : ' || sqlerrm); END; / Beginning of the message list enqueuing - Irina subscriber Total messages enqueued - Irina subscriber => 41 End of the message list enqueuing - Irina subscriber PL/SQL procedure successfully completed. SQL>
- Code Listing 51: Enqueue the same list of messages to subscriber Nastasia.
SET SERVEROUTPUT ON; DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message msg_typ; recipients dbms_aq.aq$_recipient_list_t; CURSOR c_sal_grp IS SELECT employee_id AS empid, first_name, last_name, salary, width_bucket(salary, 2400, 12500, 10) "SALARY Group" FROM hr.employees WHERE width_bucket(salary, 2400, 12500, 10) BETWEEN 2 AND 6 ORDER BY salary, "SALARY Group" ASC; r_sal_grp c_sal_grp%rowtype; BEGIN dbms_output.put_line('Beginning of the message list enqueuing - Nastasia subscriber'); OPEN c_sal_grp; FETCH c_sal_grp INTO r_sal_grp; recipients(1) := sys.aq$_agent('Nastasia', 'AQ_DEV.msg12_queue_multiple@LINK_TO_HR12', NULL); message_properties.recipient_list := recipients; LOOP EXIT WHEN c_sal_grp%notfound; message := msg_typ( seq_adm.nextval, r_sal_grp."SALARY Group", rpad(r_sal_grp.first_name, 20, ' ') || ' ' || r_sal_grp.last_name, trunc(r_sal_grp.salary, 2) ); dbms_aq.enqueue(queue_name => 'msg18_queue_multiple', enqueue_options => enqueue_options, message_properties => message_properties , payload => message, msgid => message_handle ); COMMIT; FETCH c_sal_grp INTO r_sal_grp; END LOOP; dbms_output.put_line('Total messages enqueued - Nastasia subscriber => ' || c_sal_grp%rowcount); CLOSE c_sal_grp; dbms_output.put_line('End of the message list enqueuing - Nastasia subscriber'); EXCEPTION WHEN OTHERS THEN dbms_output.put_line('error : ' || sqlerrm); END; / Beginning of the message list enqueuing - Nastasia subscriber Total messages enqueued - Nastasia subscriber => 41 End of the message list enqueuing - Nastasia subscriber PL/SQL procedure successfully completed.
- Code Listing 52: Dequeue messages of Nastasia consumer.
SET SERVEROUTPUT ON; DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message msg_typ; no_messages EXCEPTION; end_of_group EXCEPTION; PRAGMA exception_init ( no_messages, -25228 ); PRAGMA exception_init ( end_of_group, -25235 ); BEGIN dequeue_options.wait := dbms_aq.no_wait; dequeue_options.navigation := dbms_aq.first_message; dequeue_options.navigation := dbms_aq.first_message; dequeue_options.consumer_name := 'Nastasia'; dbms_output.put_line('Beginning of the message list dequeuing - Nastasia consumer'); dbms_output.put_line('=========================================================='); dbms_output.put_line('sender_id First Name Last Name Salary'); dbms_output.put_line('========= ========== ========= ======'); LOOP BEGIN dbms_aq.dequeue(queue_name => 'msg12_queue_multiple', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle ); dbms_output.put_line(rpad(message.sender_id, 20, ' ') || ' ' || rpad(message.text, 45, ' ') || ' ' || TO_CHAR(rpad(message.salary, 45, ' '), '99G999D99C', 'NLS_NUMERIC_CHARACTERS = '',.'' NLS_ISO_CURRENCY=FRANCE' )); dequeue_options.navigation := dbms_aq.next_message; EXCEPTION WHEN end_of_group THEN dbms_output.put_line('Finished ' || message.subject); COMMIT; dequeue_options.navigation := dbms_aq.next_transaction; END; END LOOP; EXCEPTION WHEN no_messages THEN dbms_output.put_line('No more messages'); END; / Beginning of the message list dequeuing - Nastasia consumer ========================================================== sender_id First Name Last Name Salary ========= ========== ========= ====== 1711 TRENNA RAJS 2.380,00EUR 1712 RENSKE LADWIG 2.448,00EUR 1713 JENNIFER DILLY 2.448,00EUR 1714 KELLY CHUNG 2.584,00EUR 1715 BRITNEY EVERETT 2.652,00EUR 1716 SARAH BELL 2.720,00EUR 1717 ALEXIS BULL 2.788,00EUR 1718 DIANA LORENTZ 2.856,00EUR 1719 NANDITA SARCHAND 2.856,00EUR 1720 JENNIFER WHALEN 2.992,00EUR 1721 DAVID AUSTIN 3.264,00EUR 1722 VALLI PATABALLA 3.264,00EUR 1723 KEVIN MOURGOS 3.944,00EUR 1724 BRUCE ERNST 4.080,00EUR 1725 PAT FAY 4.080,00EUR 1726 SUNDITA KUMAR 4.148,00EUR 1727 CHARLES JOHNSON 4.216,00EUR 1728 AMIT BANDA 4.216,00EUR 1729 SUNDAR ANDE 4.352,00EUR 1730 SHANTA VOLLMAN 4.420,00EUR 1731 SUSAN MAVRIS 4.420,00EUR 1732 DAVID LEE 4.624,00EUR 1733 LUIS POPP 4.692,00EUR 1734 KIMBERELY GRANT 4.760,00EUR 1735 OLIVER TUVAULT 4.760,00EUR 1736 SARATH SEWALL 4.760,00EUR 1737 MATTEA MARVINS 4.896,00EUR 1738 ELIZABETH BATES 4.964,00EUR 1739 WILLIAM SMITH 5.032,00EUR 1740 NANETTE CAMBRAULT 5.100,00EUR 1741 LOUISE DORAN 5.100,00EUR 1742 ISMAEL SCIARRA 5.236,00EUR 1743 JOSE MANUEL URMAN 5.304,00EUR 1744 PAYAM KAUFLING 5.372,00EUR 1745 MATTHEW WEISS 5.440,00EUR 1746 CHRISTOPHER OLSEN 5.440,00EUR 1747 LINDSEY SMITH 5.440,00EUR 1748 ADAM FRIPP 5.576,00EUR 1749 JOHN CHEN 5.576,00EUR 1750 WILLIAM GIETZ 5.644,00EUR 1751 JACK LIVINGSTON 5.712,00EUR No more messages PL/SQL procedure successfully completed.
- Code Listing 53: Dequeue messages of Irina consumer.
SET SERVEROUTPUT ON; DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message msg_typ; no_messages EXCEPTION; end_of_group EXCEPTION; PRAGMA exception_init ( no_messages, -25228 ); PRAGMA exception_init ( end_of_group, -25235 ); BEGIN dequeue_options.wait := dbms_aq.no_wait; dequeue_options.navigation := dbms_aq.first_message; dequeue_options.navigation := dbms_aq.first_message; dequeue_options.consumer_name := 'Irina'; dbms_output.put_line('Beginning of the message list dequeuing - Irina consumer'); dbms_output.put_line('==========================================================='); dbms_output.put_line('sender_id First Name Last Name Salary'); dbms_output.put_line('========= ========== ========= ======'); LOOP BEGIN dbms_aq.dequeue(queue_name => 'msg12_queue_multiple', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle ); dbms_output.put_line(rpad(message.sender_id, 20, ' ') || ' ' || rpad(message.text, 45, ' ') || ' ' || TO_CHAR(rpad(message.salary, 45, ' '), '$999G999D99') ); dequeue_options.navigation := dbms_aq.next_message; EXCEPTION WHEN end_of_group THEN dbms_output.put_line('Finished ' || message.subject); COMMIT; dequeue_options.navigation := dbms_aq.next_transaction; END; END LOOP; EXCEPTION WHEN no_messages THEN dbms_output.put_line('No more messages'); END; / Beginning of the message list dequeuing - Irina consumer =========================================================== sender_id First Name Last Name Salary ========= ========== ========= ====== 1834 Trenna Rajs $3,500.00 1835 Renske Ladwig $3,600.00 1836 Jennifer Dilly $3,600.00 1837 Kelly Chung $3,800.00 1838 Britney Everett $3,900.00 1839 Sarah Bell $4,000.00 1840 Alexis Bull $4,100.00 1841 Diana Lorentz $4,200.00 1842 Nandita Sarchand $4,200.00 1843 Jennifer Whalen $4,400.00 1844 David Austin $4,800.00 1845 Valli Pataballa $4,800.00 1846 Kevin Mourgos $5,800.00 1847 Bruce Ernst $6,000.00 1848 Pat Fay $6,000.00 1849 Sundita Kumar $6,100.00 1850 Charles Johnson $6,200.00 1851 Amit Banda $6,200.00 1852 Sundar Ande $6,400.00 1853 Shanta Vollman $6,500.00 1854 Susan Mavris $6,500.00 1855 David Lee $6,800.00 1856 Luis Popp $6,900.00 1857 Kimberely Grant $7,000.00 1858 Oliver Tuvault $7,000.00 1859 Sarath Sewall $7,000.00 1860 Mattea Marvins $7,200.00 1861 Elizabeth Bates $7,300.00 1862 William Smith $7,400.00 1863 Nanette Cambrault $7,500.00 1864 Louise Doran $7,500.00 1865 Ismael Sciarra $7,700.00 1866 Jose Manuel Urman $7,800.00 1867 Payam Kaufling $7,900.00 1868 Matthew Weiss $8,000.00 1869 Christopher Olsen $8,000.00 1870 Lindsey Smith $8,000.00 1871 Adam Fripp $8,200.00 1872 John Chen $8,200.00 1873 William Gietz $8,300.00 1874 Jack Livingston $8,400.00 No more messages PL/SQL procedure successfully completed.
Oracle WebLogic Server 12c: Configuring JMS Servers and Destinations Oracle REST Data Services 21.3 Installation