Welcome to Salem Houali ‘s Oracle Developer Notes

Oracle Advanced Queuing (AQ) demo

Introduction: 

Advanced Queuing (AQ) has been available for several versions of Oracle. 

It provides database-integrated message queuing functionality, which is built on top of Oracle Streams and leverages the functions of Oracle Database so that messages can be stored persistently, propagated between queues on different computers and databases, and transmitted using Oracle Net Services and HTTP(S). (For more information, refer to  oracle documentation

The following shows an example of the use of Oracle Database Advanced Queuing for implementing a publish/subscribe relationship between publisher Application A 1and subscriber App.C1, App.C2, and App.C4:

Purpose: 

This article will show how to schedule queue propagation from an oracle database18c to an oracle database12c using database link.

It presents all the steps to follow to ensure that the demo runs successfully. We have a target environment on which Oracle Database 12c Enterprise Edition Release 12.2.0.1.0 – 64bit Production is installed and a source environment on which Oracle Database 18c Enterprise Edition Release 18.0.0.0.0 – Production Version 18.3.0.0.0 is installed.

All what we need to run this demo, is a configured pluggable database in both source and target environment.

Scenario: 

• We have two remote subscribers Irina and Nastasia in an oracle database 12c environment and AQ_DEV user in both source and target environment. 

• The subscriber Nastasia is added by specifying a transformation on the payload, while no transaformation is specified for Irina.

• A same list of messages containing salaries amount in Canadian dollars is enqueued to both Irina and Nastasia subscribers. 

• When dequeening the messages and specifying Nastasia subscriber, all salaries amounts are converted in Euro by a transformation.

Requirments:
• ora12plg is an oracle12c pluggable database. (Configuration is not part of this demo).
• oraplg is an oracle18c  pluggable database. (Configuration is not part of this demo).
• Create a database link in oraplg database. 
• Set up the destination environment. 
• Set up the source environment. 
The database link must be created in the source environment, oraplg oracle18c pluggable database. To achieve this, the pluggable database must be in an open and running state and some conditions must be satisfied. These conditions are listed from Condition 1 to Condition 5.
Execute the statement sqlplus / as sysdba in a command-line from <oracle_home>\bin>, where <oracle_home> is an environment variable set in your windows regitsry.
SQL*Plus: Release 18.0.0.0.0 - Production on Sat Jun 8 13:33:33 2019
Version 18.3.0.0.0
Copyright (c) 1982, 2018, Oracle.  All rights reserved.
Connected to:
Oracle Database 18c Enterprise Edition Release 18.0.0.0.0 - Production
Version 18.3.0.0.0
COLUMN name FORMAT a20;
COLUMN open_mode FORMAT a20;
SELECT name, open_mode
FROM v$pdbs
where name = 'ORAPLG'
/
NAME                 OPEN_MODE
-------------------- --------------------
ORAPLG               MOUNTED
alter pluggable database oraplg open read write
/
Pluggable database altered.
select name, open_mode 

from V$PDBS
where name = 'ORAPLG'
/
NAME                 OPEN_MODE
-------------------- --------------------
ORAPLG               READ WRITE
COLUMN user FORMAT a10;
COLUMN con_name FORMAT a10;
SELECT user,
       sys_context('USERENV', 'con_name') AS con_name
 FROM dual
 /
USER       CON_NAME
---------- ----------
SYS        CDB$ROOT
alter session set container=ORAPLG
/
Session altered.
SELECT user,
    sys_context('USERENV', 'con_name') AS con_name
FROM   dual
/
USER       CON_NAME
---------- ----------
SYS        ORAPLG
Create a public database link using service name
The following step shows how to create a public database link using service name configured in the tnsnames.ora file.
First, open the tnsnames.ora file with a text editor such as notepad++, textpad and then copy the specified description containg the target database settings.
Note that the host and the port are those configured for the target database, for our demo, it’s the ora12plg pluggable database12c.
Use the following snytax:
CREATE [PUBLIC] DATABASE LINK <link_name>
CONNECT TO CURRENT_USER
USING ‘<service_name>’;
  CREATE PUBLIC DATABASE LINK LINK_TO_HR12
   CONNECT TO "SYSTEM" IDENTIFIED BY PassXxXx
   USING '(DESCRIPTION =
    (ADDRESS = (PROTOCOL = TCP)(HOST = linecode)(PORT = 1522))
    (CONNECT_DATA =
      (SERVER = DEDICATED)
      (SERVICE_NAME = ORA12PLG)
    )
  )'
/
Database link created.
First, we need to have our PDB  in an open and running state. To achieve this, some conditions must be satisfied. These conditions are listed from Codition 1 to Condition 5.
Condition 1: Connect  as sysdba in the source environment.
Execute the statement sqlplus / as sysdba in a command-line from <oracle_home>\bin>, where <oracle_home> is an environment variable set in your windows regitsry.
<oracle_home>\bin>sqlplus / as sysdba;
SQL*Plus: Release 12.2.0.1.0 Production on Sat Jun 8 14:06:29 2019
Copyright (c) 1982, 2016, Oracle.  All rights reserved.
Connected to:
Oracle Database 12c Enterprise Edition Release 12.2.0.1.0 - 64bit Production
COLUMN name FORMAT a20;
COLUMN open_mode FORMAT a20;
select name, open_mode 
from V$PDBS
where name = 'ORA12PLG'
/
NAME                 OPEN_MODE
-------------------- --------------------
ORA12PLG             MOUNTED
SQL>
alter pluggable database ora12plg open;
Pluggable database altered.
select name, open_mode 
from V$PDBS
where name = 'ORA12PLG'
/
NAME                 OPEN_MODE
-------------------- --------------------
ORA12PLG             READ WRITE
column user format A20;
column con_name format A20;

SELECT user,
    sys_context('USERENV', 'con_name') AS con_name
FROM   dual
/
USER                 CON_NAME
-------------------- --------------------
SYS                  CDB$ROOT
alter session set container=ORA12PLG
/
Session altered.
SELECT user,
    sys_context('USERENV', 'con_name') AS con_name
FROM   dual
/
USER                 CON_NAME
-------------------- --------------------
SYS                  ORA12PLG
step 2.2:  Create advanced queuing infrastucture in the target environment.
step 2.2.1:   Create a specific tablespace for user aq_dev’s objects
ACCEPT tbs12c_name CHAR PROMPT 'Enter the tablespace''s name in ora12plg : ';
Enter the tablespace's name in ora12plg : aq12c_tbs
Note that it s not necessary to specify any value for the tablespace other than it’s name. The same script is run also in oracle18c PDB .
set serveroutput on;
set verify off;
DECLARE
   vcount_adm_tbs   INTEGER := 0;
   v_plsql          VARCHAR2(500);
   v_tbs_path       VARCHAR2(100);
   v_tbs_name       VARCHAR2(20) := upper('&tbs12c_name');

BEGIN

   SELECT COUNT(1)
   INTO vcount_adm_tbs
   FROM v$tablespace
   WHERE name = upper(v_tbs_name);

    IF vcount_adm_tbs != 0 THEN
       EXECUTE IMMEDIATE ( 'drop tablespace '
                           || upper(v_tbs_name)
                           || ' including contents and datafiles cascade constraints' );
   END IF;

   SELECT regexp_replace(ddf.file_name, '.[[:alnum:]]+', '', instr(ddf.file_name, '\', - 1))
   INTO v_tbs_path
   FROM dba_data_files ddf
   WHERE ROWNUM < 2;

   v_plsql := 'create tablespace '
              || v_tbs_name
              || ' datafile '''
              || v_tbs_path
              || '\'
              || v_tbs_name
              || '.DBF'' SIZE 5M REUSE EXTENT MANAGEMENT LOCAL SEGMENT SPACE MANAGEMENT AUTO';

    EXECUTE IMMEDIATE v_plsql;
  dbms_output.put_line('Tablespace  '||v_tbs_name|| '  successfully created!.') ;

EXCEPTION
   WHEN OTHERS THEN
       dbms_output.put_line('Error while dropping/creating tablespace ( '||v_tbs_name||'  )' || sqlerrm);
END;
/
Tablespace  AQ12C_TBS  successfully created!.
PL/SQL procedure successfully completed.

set serveroutput on;
DECLARE
  vcount_aq_dev INTEGER := 0;
BEGIN
  SELECT COUNT(1)
  INTO vcount_aq_dev
  FROM dba_users
  WHERE username = ‘AQ_DEV’;
  IF vcount_aq_dev != 0 THEN
      EXECUTE IMMEDIATE ( ‘DROP USER AQ_DEV CASCADE’ );
      dbms_output.put_line(‘AQ_DEV user dropped’);
  ELSE
      dbms_output.put_line(‘AQ_DEV user not found’);
  END IF;
EXCEPTION
  WHEN OTHERS THEN
      dbms_output.put_line(‘Error while dropping user : ‘ || sqlerrm);
END;
/

AQ_DEV user dropped
PL/SQL procedure successfully completed.

ACCEPT pass_aq_dev CHAR PROMPT 'Enter the password for AQ_DEV user : ' HIDE;
Enter the password for aq_dev :
CREATE USER aq_dev IDENTIFIED BY &pass_aq_dev
    DEFAULT TABLESPACE &tbs12c_name;
GRANT dba, CREATE ANY TYPE TO aq_dev;
GRANT CREATE ANY TYPE TO aq_dev;
GRANT aq_administrator_role TO aq_dev;
GRANT CREATE SESSION TO aq_dev;
GRANT connect TO aq_dev;
GRANT CREATE TYPE TO aq_dev;
GRANT EXECUTE ON dbms_aq TO aq_dev
/

SQL> CREATE USER aq_dev IDENTIFIED BY &pass_aq_dev
  2      DEFAULT TABLESPACE &tbs12c_name;
User created.
SQL> GRANT dba, CREATE ANY TYPE TO aq_dev;
Grant succeeded.
SQL> GRANT CREATE ANY TYPE TO aq_dev;
Grant succeeded.
SQL> GRANT aq_administrator_role TO aq_dev;
Grant succeeded.
SQL> GRANT CREATE SESSION TO aq_dev;
Grant succeeded.
SQL> GRANT connect TO aq_dev;
Grant succeeded.
SQL> GRANT CREATE TYPE TO aq_dev;
Grant succeeded.
SQL> GRANT EXECUTE ON dbms_aq TO aq_dev
  2  /
Grant succeeded.
SQL>
connect AQ_DEV/&pass_aq_dev@linecode:1522/ora12plg;
Connected.
SELECT
    user,
    sys_context('USERENV', 'con_name') AS con_name
FROM
    dual
/
USER                 CON_NAME
-------------------- --------------------
AQ_DEV               ORA12PLG
CREATE OR REPLACE TYPE msg_typ AS OBJECT (
    sender_id   NUMBER,
    subject     VARCHAR2(30),
    text        VARCHAR2(1000),
    salary      NUMBER
)
/
Type created.
SET SERVEROUTPUT ON;
BEGIN
-- create a queue table
    dbms_aqadm.create_queue_table(queue_table => 'multi_msgs12_csumers_qtab', 
                                  multiple_consumers => true, 
                                  queue_payload_type => 'msg_typ'
                                  );
-- create a queue
    dbms_aqadm.create_queue(queue_name => 'msg12_queue_multiple', 
                            queue_table => 'multi_msgs12_csumers_qtab'
                            );
EXCEPTION
   WHEN OTHERS THEN
        dbms_output.put_line('Error while creating queue/queue table objects: ' || sqlerrm);
END;
/
PL/SQL procedure successfully completed.
SET LINESIZE 300;
COLUMN name FORMAT a50;
COLUMN queue_table FORMAT a50;
COLUMN queue_type FORMAT a20;

SELECT
    name,
    queue_table,
    queue_type
FROM
    user_queues
/
NAME                                   QUEUE_TABLE                                        QUEUE_TYPE
-------------------------------------- -------------------------------------------------- -----------
AQ$_MULTI_MSGS12_CSUMERS_QTAB_E        MULTI_MSGS12_CSUMERS_QTAB                          EXCEPTION_QUEUE
MSG12_QUEUE_MULTIPLE                   MULTI_MSGS12_CSUMERS_QTAB                          NORMAL_QUEUE
COLUMN queue_table FORMAT a50;
COLUMN object_type FORMAT a50;
COLUMN TYPE FORMAT a7;
SELECT
    queue_table,
    object_type,
    type
FROM
    user_queue_tables
/
QUEUE_TABLE                                        OBJECT_TYPE                                        TYPE
-------------------------------------------------- -------------------------------------------------- ------
MULTI_MSGS12_CSUMERS_QTAB                          AQ_DEV.MSG_TYP                                     OBJECT
SQL>
BEGIN
    dbms_aqadm.start_queue(queue_name => 'msg12_queue_multiple');
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Error while starting the queue: ' || sqlerrm);
END;
/
SQL> BEGIN
  2      dbms_aqadm.start_queue(queue_name => 'msg12_queue_multiple');
  3  EXCEPTION
  4      WHEN OTHERS THEN
  5          dbms_output.put_line('Error while starting the queue: ' || sqlerrm);
  6  END;
  7  /
PL/SQL procedure successfully completed.
SQL>
This ends the setup of our target environment.
Step 3 :   Set up the source environment. (ORAPLG PDB in Oracle database18c). 
Note: 
If your ORAPLG PDB is still in the same  state as in (step 1 :  Create a database link) and the container is set to ORAPLG PDB, then  it is not necessary to verify Condition 1 to Condition 5, otherwie these conditions must be satisfied.
<oracle_home>\bin>sqlplus / as sysdba;
SQL*Plus: Release 18.0.0.0.0 - Production on Tue Jul 30 22:57:29 2019
Version 18.3.0.0.0
Copyright (c) 1982, 2018, Oracle.  All rights reserved.
Connected to:
Oracle Database 18c Enterprise Edition Release 18.0.0.0.0 - Production
Version 18.3.0.0.0
COLUMN name FORMAT a20;
COLUMN open_mode FORMAT a20;
select name, open_mode 
from V$PDBS
where name = 'ORAPLG'
/
NAME                 OPEN_MODE
-------------------- --------------------
ORAPLG               MOUNTED
alter pluggable database oraplg open
/
Pluggable database altered.
select name, open_mode 
from V$PDBS
where name = 'ORAPLG'
/

NAME                 OPEN_MODE
-------------------- --------------------
ORAPLG               READ WRITE
column user format A20;
column con_name format A20;
SELECT user,
    sys_context('USERENV', 'con_name') AS con_name
FROM   dual
/
USER                 CON_NAME
-------------------- --------------------
SYS                  CDB$ROOT
alter session set container=ORAPLG
/
Session altered.
SELECT user,
    sys_context('USERENV', 'con_name') AS con_name
FROM   dual
/
USER                 CON_NAME
-------------------- --------------------
SYS                  ORAPLG
ACCEPT tbs18c_name CHAR PROMPT 'Enter the tablespace''s name in oraplg : ';
Enter the tablespace's name in oraplg : aq18c_tbs
set serveroutput on;
set verify off;

DECLARE
    vcount_adm_tbs   INTEGER := 0;
    v_plsql          VARCHAR2(500);
    v_tbs_path       VARCHAR2(100);
    v_tbs_name       VARCHAR2(20) := upper('&tbs18c_name');
BEGIN
    SELECT COUNT(1)
    INTO vcount_adm_tbs
    FROM v$tablespace
    WHERE name = upper(v_tbs_name);

    IF vcount_adm_tbs != 0 THEN
        EXECUTE IMMEDIATE ( 'drop tablespace '
                            || upper(v_tbs_name)
                            || ' including contents and datafiles cascade constraints' );
    END IF;
    SELECT regexp_replace(ddf.file_name, '.[[:alnum:]]+', '', instr(ddf.file_name, '\', - 1))
    INTO v_tbs_path
    FROM dba_data_files ddf
    WHERE ROWNUM < 2;
    v_plsql := 'create tablespace '
               || v_tbs_name
               || ' datafile '''
               || v_tbs_path
               || '\'
               || v_tbs_name
               || '.DBF'' SIZE 5M REUSE EXTENT MANAGEMENT LOCAL SEGMENT SPACE MANAGEMENT AUTO';
    EXECUTE IMMEDIATE v_plsql;
   dbms_output.put_line('Tablespace  '||v_tbs_name|| '  successfully created!.') ;
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Error while dropping/creating tablespace ( '||v_tbs_name||'  )' || sqlerrm);
END;
/
Tablespace  AQ18C_TBS  successfully created!.
PL/SQL procedure successfully completed.

DECLARE
  vcount_aq_dev INTEGER := 0;
BEGIN
  SELECT COUNT(1)
  INTO vcount_aq_dev
  FROM dba_users
  WHERE username = ‘AQ_DEV’;
  IF vcount_aq_dev != 0 THEN
      EXECUTE IMMEDIATE ( ‘DROP USER AQ_DEV CASCADE’ );
      dbms_output.put_line(‘AQ_DEV user dropped’);
  ELSE
      dbms_output.put_line(‘AQ_DEV user not found’);
  END IF;

EXCEPTION
  WHEN OTHERS THEN
      dbms_output.put_line(‘Error while dropping user : ‘ || sqlerrm);
END;
/
AQ_DEV user dropped
PL/SQL procedure successfully completed.

ACCEPT pass_aq_dev CHAR PROMPT 'Enter the password for aq_dev : ' HIDE;
Enter the password for aq_dev :
CREATE USER aq_dev IDENTIFIED BY &pass_aq_dev
    DEFAULT TABLESPACE &tbs18c_name;
GRANT dba, CREATE ANY TYPE TO aq_dev;
GRANT aq_administrator_role TO aq_dev;
GRANT CONNECT TO aq_dev;
GRANT CREATE TYPE TO aq_dev;
GRANT EXECUTE ON dbms_aq TO aq_dev;
GRANT SELECT ON hr.employees TO aq_dev;
GRANT CREATE SEQUENCE TO aq_dev
/

SQL> CREATE USER aq_dev IDENTIFIED BY &pass_aq_dev
  2      DEFAULT TABLESPACE &tbs18c_name;
User created.
SQL> GRANT dba, CREATE ANY TYPE TO aq_dev;
Grant succeeded.
SQL> GRANT aq_administrator_role TO aq_dev;
Grant succeeded.
SQL> GRANT CONNECT TO aq_dev;
Grant succeeded.
SQL> GRANT CREATE TYPE TO aq_dev;
Grant succeeded.
SQL> GRANT EXECUTE ON dbms_aq TO aq_dev;
Grant succeeded.
SQL> GRANT SELECT ON hr.employees TO aq_dev;
Grant succeeded.
SQL> GRANT CREATE SEQUENCE TO aq_dev
  2  /
Grant succeeded.

connect AQ_DEV/&pass_aq_dev@linecode:1521/oraplg;
Connected.
SELECT
    user,
    sys_context('USERENV', 'con_name') AS con_name
FROM dual
/

USER                 CON_NAME
-------------------- --------------------
AQ_DEV               ORAPLG
SQL>

SET LINESIZE 250;
COLUMN employee_id FORMAT 999999;
COLUMN first_name FORMAT a10;
COLUMN last_name FORMAT a10;
COLUMN email FORMAT a25;
COLUMN phone_number FORMAT a20;
COLUMN job_id FORMAT a10;
COLUMN salary FORMAT 999999.99;
COLUMN commission_pct FORMAT 99.99;
COLUMN manager_id FORMAT 999999;
SELECT
    employee_id      AS emp_id,
    first_name       AS f_name,
    last_name        AS l_name,
    email            AS email,
    phone_number     AS phone,
    job_id,
    salary,
    nvl(commission_pct,0)   AS com_pct,
    manager_id       AS mgr_id
FROM
    hr.employees@link_to_hr12
WHERE
    employee_id = 999
/
EMP_ID      F_NAME      L_NAME     EMAIL          PHONE          JOB_ID       SALARY    COM_PCT     MGR_ID
---------- -------------------- ------------------------- ------------------------- -------------------- ---
999         Salem       Houali     salemhouali    5147949706     AC_MGR       8500.00         0        101
SQL>
DECLARE
    vcount_seq_adm INTEGER := 0;
BEGIN
    SELECT COUNT(1)
    INTO vcount_seq_adm
    FROM dba_sequences
    WHERE sequence_name LIKE 'SEQ_AD%'
    AND sequence_owner = 'AQ_DEV';
    IF vcount_seq_adm <> 0 THEN
        EXECUTE IMMEDIATE ( 'drop sequence SEQ_ADM' );
        EXECUTE IMMEDIATE ( 'create sequence SEQ_ADM start with 1 increment by 1 NOCACHE NOCYCLE' );
    ELSE
        EXECUTE IMMEDIATE ( 'create sequence SEQ_ADM start with 1 increment by 1 NOCACHE NOCYCLE' );
    END IF;
    dbms_output.put_line('SEQ_ADM Sequence Created!');
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Error while creating seq_adm sequence : ' || sqlerrm);
END;
/
PL/SQL procedure successfully completed.
CREATE OR REPLACE TYPE msg_typ AS OBJECT (
    sender_id   NUMBER,
    subject     VARCHAR2(30),
    text        VARCHAR2(1000),
    salary      NUMBER
)
/
Type created.
BEGIN
-- Queue table creation
    dbms_aqadm.create_queue_table(
                                queue_table => 'multi_msgs18_csumers_qtab', 
                                multiple_consumers => true, queue_payload_type => 'msg_typ', 
                                comment => 'Queue table in oraplg pluggable18c database'
                                );
-- Queue creation
    dbms_aqadm.create_queue(
                            queue_name => 'msg18_queue_multiple', 
                            queue_table => 'multi_msgs18_csumers_qtab', 
                            comment => 'Queue table in oraplg pluggable18c database'
                            );
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Error while creating queue table / queue : ' || sqlerrm);
END;
/
PL/SQL procedure successfully completed.
COLUMN name FORMAT a50;
COLUMN queue_table FORMAT a50;
COLUMN queue_type FORMAT a20;
SELECT
    name,
    queue_table,
    queue_type
FROM
    user_queues
/
NAME                                  QUEUE_TABLE                                        QUEUE_TYPE
------------------- -------------------------------------------------- --------------------
AQ$_MULTI_MSGS18_CSUMERS_QTAB_E       MULTI_MSGS18_CSUMERS_QTAB                          EXCEPTION_QUEUE
MSG18_QUEUE_MULTIPLE                  MULTI_MSGS18_CSUMERS_QTAB                          NORMAL_QUEUE
COLUMN queue_table FORMAT a50;
COLUMN object_type FORMAT a50;
COLUMN TYPE FORMAT a7;
SELECT
    queue_table,
    object_type,
    type
FROM
    user_queue_tables
/
QUEUE_TABLE                                        OBJECT_TYPE                                        TYPE
-------------------------------------------------- -------------------------------------------------- -------
MULTI_MSGS18_CSUMERS_QTAB                          AQ_DEV.MSG_TYP                                     OBJECT
BEGIN
    dbms_aqadm.start_queue(queue_name => 'msg18_queue_multiple');
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Error while starting msg18_queue_multiple queue: ' || sqlerrm);
END;
/
PL/SQL procedure successfully completed.
DECLARE
    rc BINARY_INTEGER;
BEGIN
    dbms_aqadm.verify_queue_types( src_queue_name => 'msg18_queue_multiple', 
                                   dest_queue_name => 'msg12_queue_multiple', 
                                   destination => 'LINK_TO_HR12', 
  rc => rc
                                   );
    dbms_output.put_line('Compatible :  '||rc);
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('verify_queue_types - error : ' || sqlerrm);
END;
/
VQT: new style queue
Compatible :  1
PL/SQL procedure successfully completed.
BEGIN
    dbms_aqadm.schedule_propagation(queue_name => 'msg18_queue_multiple', 
                                    destination => 'LINK_TO_HR12');
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('error while schudeling propagation : ' || sqlerrm);
END;
/
COLUMN qname                    FORMAT a50;
COLUMN destination              FORMAT a50;
COLUMN message_delivery_mode    FORMAT a10;
COLUMN job_name                 FORMAT a50;
SELECT
    qname,
    destination,
    message_delivery_mode,
    job_name
FROM
    user_queue_schedules
    where destination like '%LINK_TO_HR%'
/
QNAME                          DESTINATION                                        MESSAGE_DE JOB_NAME
-------------------------------------------------- --------------------------------------------------
MSG18_QUEUE_MULTIPLE           LINK_TO_HR12                                       PERSISTENT AQ_JOB$_2092
CREATE OR REPLACE FUNCTION can_2_euro_salary (
    src msg_typ
) RETURN msg_typ AS
    target msg_typ;
BEGIN
    target := msg_typ(src.sender_id, 
                      src.subject, upper(src.text), 
                      trunc((src.salary * .68), 2)
                );
    return(target);
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Erreur while creating can_2_euro_salary function : ' || sqlerrm);
END can_2_euro_salary;
/
Function created.
BEGIN
    dbms_transform.create_transformation(schema         => 'AQ_DEV', 
                                         name           => 'cad$_2_euro',
                                         from_schema    => 'AQ_DEV', 
                                         from_type      => 'msg_typ', 
                                         to_schema      => 'AQ_DEV', 
                                         to_type        => 'msg_typ', 
                                         transformation => 'AQ_DEV.can_2_euro_salary(source.user_data)'
                                         );
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Erreur while creating cad$_2_euro - transformation : ' || sqlerrm);
END;
/
PL/SQL procedure successfully completed.
COLUMN transformation_id FORMAT 9999
COLUMN name             FORMAT A20;
COLUMN from_type        FORMAT A20;
COLUMN TO_TYPE          FORMAT A20;
SELECT
    transformation_id as trn_id,
    name,
    from_type,
    to_type
FROM
    user_transformations
    /
        TRN_ID NAME                 FROM_TYPE            TO_TYPE
---------- -------------------- -------------------- --------------------
       141 CAD$_2_EURO          AQ_DEV.MSG_TYP       AQ_DEV.MSG_TYP
DECLARE
    subscriber sys.aq$_agent;
BEGIN
    subscriber := sys.aq$_agent('Irina', 'AQ_DEV.msg12_queue_multiple@LINK_TO_HR12', NULL);
    dbms_aqadm.add_subscriber(queue_name => 'msg18_queue_multiple', subscriber => subscriber);
    -- cad$_2_euro transformation is specified only when adding Nastasia subscriber
    subscriber := sys.aq$_agent('Nastasia', 'AQ_DEV.msg12_queue_multiple@LINK_TO_HR12', NULL);
    dbms_aqadm.add_subscriber(queue_name => 'msg18_queue_multiple', 
   subscriber => subscriber,
   transformation =>'cad$_2_euro'
   );
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('error while adding subscribers: ' || sqlerrm);
END;
/
PL/SQL procedure successfully completed.
column QUEUE_NAME  format A20;  
column C_NAME            format A10;  
column ADDRESS                  format A45;    
column QUEUE_TO_QUEUE          format A15;
column TRANSFORMATION format A30;        
   select
QUEUE_NAME,        
CONSUMER_NAME AS C_NAME,
TRANSFORMATION,  
ADDRESS,          
QUEUE_TO_QUEUE
from USER_QUEUE_SUBSCRIBERS
/
QUEUE_NAME           C_NAME     TRANSFORMATION                 ADDRESS                                    QUEUE_TO_QUEUE
-------------------- ---------- ------------------------------ --------------------------------------------- -----------
MSG18_QUEUE_MULTIPLE IRINA                                     "AQ_DEV"."MSG12_QUEUE_MULTIPLE"@LINK_TO_HR12  FALSE
MSG18_QUEUE_MULTIPLE NASTASIA   "AQ_DEV"."CAD$_2_EURO"         "AQ_DEV"."MSG12_QUEUE_MULTIPLE"@LINK_TO_HR12  FALSE

We use the WIDTH_BUCKET function in order to enqueue some specific group of salaries.

Note that for a given expression, WIDTH_BUCKET returns the bucket number into which the value of this expression would fall after being evaluated. for more information  on WIDTH_BUCKET function, please refer to oracle documentation.

SET SERVEROUTPUT ON;
DECLARE
    CURSOR c_sal_grp IS
    SELECT
        employee_id AS empid,
        first_name,
        last_name,
        salary,
        width_bucket(salary, 2400, 12500, 10) "SALARY Group"
    FROM
        hr.employees
    WHERE
        width_bucket(salary, 2400, 12500, 10) BETWEEN 2 AND 6
    ORDER BY
        salary,
        "SALARY Group" ASC;
    r_sal_grp c_sal_grp%rowtype;
BEGIN
    dbms_output.put_line('beginning of list of messages to enqueue');
    dbms_output.put_line('Emp_Id               first_name           last_name                 Salary          Group');
    dbms_output.put_line('======               ==========           =========                 ======          =====');
    OPEN c_sal_grp;
    FETCH c_sal_grp INTO r_sal_grp;
    LOOP
        EXIT WHEN c_sal_grp%notfound;
        dbms_output.put_line(rpad(r_sal_grp.empid, 20, ' ')
                             || ' '
                             || rpad(r_sal_grp.first_name, 20, ' ')
                             || ' '
                             || rpad(r_sal_grp.last_name, 20, ' ')
                             || ' '
                             || rpad(TO_CHAR(r_sal_grp.salary, '$9G999G999D99'), 20, ' ')
                             || ' '
                             || r_sal_grp."SALARY Group");
        FETCH c_sal_grp INTO r_sal_grp;
    END LOOP;
    CLOSE c_sal_grp;
    dbms_output.put_line('End of list of messages to enqueue');
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('error   : ' || sqlerrm);
END;
/
beginning of list of messages to enqueue
Emp_Id               first_name           last_name                 Salary          Group
======               ==========           =========                 ======          =====
141                  Trenna               Rajs                      $3,500.00       2
137                  Renske               Ladwig                    $3,600.00       2
189                  Jennifer             Dilly                     $3,600.00       2
188                  Kelly                Chung                     $3,800.00       2
193                  Britney              Everett                   $3,900.00       2
192                  Sarah                Bell                      $4,000.00       2
185                  Alexis               Bull                      $4,100.00       2
107                  Diana                Lorentz                   $4,200.00       2
184                  Nandita              Sarchand                  $4,200.00       2
200                  Jennifer             Whalen                    $4,400.00       2
105                  David                Austin                    $4,800.00       3
106                  Valli                Pataballa                 $4,800.00       3
124                  Kevin                Mourgos                   $5,800.00       4
104                  Bruce                Ernst                     $6,000.00       4
202                  Pat                  Fay                       $6,000.00       4
173                  Sundita              Kumar                     $6,100.00       4
179                  Charles              Johnson                   $6,200.00       4
167                  Amit                 Banda                     $6,200.00       4
166                  Sundar               Ande                      $6,400.00       4
123                  Shanta               Vollman                   $6,500.00       5
203                  Susan                Mavris                    $6,500.00       5
165                  David                Lee                       $6,800.00       5
113                  Luis                 Popp                      $6,900.00       5
178                  Kimberely            Grant                     $7,000.00       5
155                  Oliver               Tuvault                   $7,000.00       5
161                  Sarath               Sewall                    $7,000.00       5
164                  Mattea               Marvins                   $7,200.00       5
172                  Elizabeth            Bates                     $7,300.00       5
171                  William              Smith                     $7,400.00       5
154                  Nanette              Cambrault                 $7,500.00       6
160                  Louise               Doran                     $7,500.00       6
111                  Ismael               Sciarra                   $7,700.00       6
112                  Jose Manuel          Urman                     $7,800.00       6
122                  Payam                Kaufling                  $7,900.00       6
120                  Matthew              Weiss                     $8,000.00       6
153                  Christopher          Olsen                     $8,000.00       6
159                  Lindsey              Smith                     $8,000.00       6
121                  Adam                 Fripp                     $8,200.00       6
110                  John                 Chen                      $8,200.00       6
206                  William              Gietz                     $8,300.00       6
177                  Jack                 Livingston                $8,400.00       6
End of list of messages to enqueue
PL/SQL procedure successfully completed.
SET SERVEROUTPUT ON;
DECLARE
    enqueue_options      dbms_aq.enqueue_options_t;
    message_properties   dbms_aq.message_properties_t;
    message_handle       RAW(16);
    message              msg_typ;
    recipients           dbms_aq.aq$_recipient_list_t;
    CURSOR c_sal_grp IS
    SELECT
        employee_id AS empid,
        first_name,
        last_name,
        salary,
        width_bucket(salary, 2400, 12500, 10) "SALARY Group"
    FROM
        hr.employees
    WHERE
        width_bucket(salary, 2400, 12500, 10) BETWEEN 2 AND 6
    ORDER BY
        salary,
        "SALARY Group" ASC;
    r_sal_grp            c_sal_grp%rowtype;
BEGIN
    dbms_output.put_line('Beginning of the message list enqueuing - Irina subscriber');
    OPEN c_sal_grp;
    FETCH c_sal_grp INTO r_sal_grp;
    recipients(1) := sys.aq$_agent('Irina', 'AQ_DEV.msg12_queue_multiple@LINK_TO_HR12', NULL);
    message_properties.recipient_list := recipients;
    LOOP
        EXIT WHEN c_sal_grp%notfound;
        message := msg_typ( seq_adm.nextval, 
                            r_sal_grp."SALARY Group", 
                            rpad(r_sal_grp.first_name, 20, ' ')
                            || '  '
                            || r_sal_grp.last_name, trunc(r_sal_grp.salary, 2)
                            );
        dbms_aq.enqueue(queue_name => 'msg18_queue_multiple', 
                        enqueue_options => enqueue_options, 
                        message_properties => message_properties , 
                        payload => message, msgid => message_handle
                        );
        COMMIT;
        FETCH c_sal_grp INTO r_sal_grp;
    END LOOP;
    dbms_output.put_line('Total  messages  enqueued   -  Irina subscriber   =>  ' || c_sal_grp%rowcount);
    CLOSE c_sal_grp;
    dbms_output.put_line('End of the message list enqueuing - Irina subscriber');
EXCEPTION
    WHEN OTHERS THEN
       dbms_output.put_line('error   : ' || sqlerrm);
END;
/
Beginning of the message list enqueuing - Irina subscriber
Total  messages  enqueued   -  Irina subscriber   =>  41
End of the message list enqueuing - Irina subscriber
PL/SQL procedure successfully completed.
SQL>
SET SERVEROUTPUT ON;
DECLARE
    enqueue_options      dbms_aq.enqueue_options_t;
    message_properties   dbms_aq.message_properties_t;
    message_handle       RAW(16);
    message              msg_typ;
    recipients           dbms_aq.aq$_recipient_list_t;
    CURSOR c_sal_grp IS
    SELECT
        employee_id AS empid,
        first_name,
        last_name,
        salary,
        width_bucket(salary, 2400, 12500, 10) "SALARY Group"
    FROM
        hr.employees
    WHERE
        width_bucket(salary, 2400, 12500, 10) BETWEEN 2 AND 6
    ORDER BY
        salary,
        "SALARY Group" ASC;
    r_sal_grp            c_sal_grp%rowtype;
BEGIN
    dbms_output.put_line('Beginning of the message list enqueuing - Nastasia subscriber');
    OPEN c_sal_grp;
    FETCH c_sal_grp INTO r_sal_grp;
    recipients(1) := sys.aq$_agent('Nastasia', 'AQ_DEV.msg12_queue_multiple@LINK_TO_HR12', NULL);
    message_properties.recipient_list := recipients;
    LOOP
        EXIT WHEN c_sal_grp%notfound;
        message := msg_typ( seq_adm.nextval, 
                            r_sal_grp."SALARY Group", 
                            rpad(r_sal_grp.first_name, 20, ' ')
                            || '  '
                            || r_sal_grp.last_name, trunc(r_sal_grp.salary, 2)
                            );
        dbms_aq.enqueue(queue_name => 'msg18_queue_multiple', 
                        enqueue_options => enqueue_options, 
                        message_properties => message_properties , 
                        payload => message, msgid => message_handle
                        );
        COMMIT;
        FETCH c_sal_grp INTO r_sal_grp;
    END LOOP;
   dbms_output.put_line('Total  messages  enqueued   -  Nastasia subscriber   =>  ' || c_sal_grp%rowcount);
    CLOSE c_sal_grp;
    dbms_output.put_line('End of the message list enqueuing - Nastasia subscriber');
EXCEPTION
    WHEN OTHERS THEN
       dbms_output.put_line('error   : ' || sqlerrm);
END;
/
Beginning of the message list enqueuing - Nastasia subscriber
Total  messages  enqueued   -  Nastasia subscriber   =>  41
End of the message list enqueuing - Nastasia subscriber
PL/SQL procedure successfully completed.
SET SERVEROUTPUT ON;
DECLARE
    dequeue_options      dbms_aq.dequeue_options_t;
    message_properties   dbms_aq.message_properties_t;
    message_handle       RAW(16);
    message              msg_typ;
    no_messages EXCEPTION;
    end_of_group EXCEPTION;
    PRAGMA exception_init ( no_messages, -25228 );
    PRAGMA exception_init ( end_of_group, -25235 );
BEGIN
    dequeue_options.wait := dbms_aq.no_wait;
    dequeue_options.navigation := dbms_aq.first_message;
    dequeue_options.navigation := dbms_aq.first_message;
    dequeue_options.consumer_name := 'Nastasia';
    dbms_output.put_line('Beginning of the message list dequeuing - Nastasia consumer');
    dbms_output.put_line('==========================================================');
    dbms_output.put_line('sender_id            First Name            Last Name               Salary');
    dbms_output.put_line('=========            ==========            =========               ======');
    LOOP 
    BEGIN
        dbms_aq.dequeue(queue_name => 'msg12_queue_multiple', 
                        dequeue_options => dequeue_options, 
                        message_properties => message_properties,
                        payload => message, 
                        msgid => message_handle
                        );
        dbms_output.put_line(rpad(message.sender_id, 20, ' ')
                             || ' '
                             || rpad(message.text, 45, ' ')
                             || ' '
                             || TO_CHAR(rpad(message.salary, 45, ' '), '99G999D99C', 'NLS_NUMERIC_CHARACTERS = '',.''
NLS_ISO_CURRENCY=FRANCE'
                             ));
        dequeue_options.navigation := dbms_aq.next_message;
    EXCEPTION
        WHEN end_of_group THEN
            dbms_output.put_line('Finished ' || message.subject); 
            COMMIT;
            dequeue_options.navigation := dbms_aq.next_transaction;
    END;   
    END LOOP;
 EXCEPTION
    WHEN no_messages THEN
        dbms_output.put_line('No more messages');
END;
/
Beginning of the message list dequeuing - Nastasia consumer
==========================================================
sender_id            First Name            Last Name               Salary
=========            ==========            =========               ======
1711                 TRENNA                RAJS                    2.380,00EUR
1712                 RENSKE                LADWIG                  2.448,00EUR
1713                 JENNIFER              DILLY                   2.448,00EUR
1714                 KELLY                 CHUNG                   2.584,00EUR
1715                 BRITNEY               EVERETT                 2.652,00EUR
1716                 SARAH                 BELL                    2.720,00EUR
1717                 ALEXIS                BULL                    2.788,00EUR
1718                 DIANA                 LORENTZ                 2.856,00EUR
1719                 NANDITA               SARCHAND                2.856,00EUR
1720                 JENNIFER              WHALEN                  2.992,00EUR
1721                 DAVID                 AUSTIN                  3.264,00EUR
1722                 VALLI                 PATABALLA               3.264,00EUR
1723                 KEVIN                 MOURGOS                 3.944,00EUR
1724                 BRUCE                 ERNST                   4.080,00EUR
1725                 PAT                   FAY                     4.080,00EUR
1726                 SUNDITA               KUMAR                   4.148,00EUR
1727                 CHARLES               JOHNSON                 4.216,00EUR
1728                 AMIT                  BANDA                   4.216,00EUR
1729                 SUNDAR                ANDE                    4.352,00EUR
1730                 SHANTA                VOLLMAN                 4.420,00EUR
1731                 SUSAN                 MAVRIS                  4.420,00EUR
1732                 DAVID                 LEE                     4.624,00EUR
1733                 LUIS                  POPP                    4.692,00EUR
1734                 KIMBERELY             GRANT                   4.760,00EUR
1735                 OLIVER                TUVAULT                 4.760,00EUR
1736                 SARATH                SEWALL                  4.760,00EUR
1737                 MATTEA                MARVINS                 4.896,00EUR
1738                 ELIZABETH             BATES                   4.964,00EUR
1739                 WILLIAM               SMITH                   5.032,00EUR
1740                 NANETTE               CAMBRAULT               5.100,00EUR
1741                 LOUISE                DORAN                   5.100,00EUR
1742                 ISMAEL                SCIARRA                 5.236,00EUR
1743                 JOSE MANUEL           URMAN                   5.304,00EUR
1744                 PAYAM                 KAUFLING                5.372,00EUR
1745                 MATTHEW               WEISS                   5.440,00EUR
1746                 CHRISTOPHER           OLSEN                   5.440,00EUR
1747                 LINDSEY               SMITH                   5.440,00EUR
1748                 ADAM                  FRIPP                   5.576,00EUR
1749                 JOHN                  CHEN                    5.576,00EUR
1750                 WILLIAM               GIETZ                   5.644,00EUR
1751                 JACK                  LIVINGSTON              5.712,00EUR
No more messages

PL/SQL procedure successfully completed.
SET SERVEROUTPUT ON;
DECLARE
    dequeue_options      dbms_aq.dequeue_options_t;
    message_properties   dbms_aq.message_properties_t;
    message_handle       RAW(16);
    message              msg_typ;
    no_messages EXCEPTION;
    end_of_group EXCEPTION;
    PRAGMA exception_init ( no_messages, -25228 );
    PRAGMA exception_init ( end_of_group, -25235 );
BEGIN
    dequeue_options.wait := dbms_aq.no_wait;
    dequeue_options.navigation := dbms_aq.first_message;
    dequeue_options.navigation := dbms_aq.first_message;
    dequeue_options.consumer_name := 'Irina';
    dbms_output.put_line('Beginning of the message list dequeuing - Irina consumer');
    dbms_output.put_line('===========================================================');
    dbms_output.put_line('sender_id            First Name            Last Name                  Salary');
    dbms_output.put_line('=========            ==========            =========                  ======');
    LOOP 
    BEGIN
        dbms_aq.dequeue(queue_name => 'msg12_queue_multiple', 
                        dequeue_options => dequeue_options, 
                        message_properties => message_properties,
                        payload => message, 
                        msgid => message_handle
                        );
        dbms_output.put_line(rpad(message.sender_id, 20, ' ')
                             || ' '
                             || rpad(message.text, 45, ' ')
                             || ' '
                             || TO_CHAR(rpad(message.salary, 45, ' '), '$999G999D99')
                             );
        dequeue_options.navigation := dbms_aq.next_message;
    EXCEPTION
        WHEN end_of_group THEN
            dbms_output.put_line('Finished ' || message.subject); 
            COMMIT;
            dequeue_options.navigation := dbms_aq.next_transaction;
    END;   
    END LOOP;
EXCEPTION
    WHEN no_messages THEN
        dbms_output.put_line('No more messages');
END;
/
Beginning of the message list dequeuing - Irina consumer
===========================================================
sender_id            First Name            Last Name                  Salary
=========            ==========            =========                  ======
1834                 Trenna                Rajs                       $3,500.00
1835                 Renske                Ladwig                     $3,600.00
1836                 Jennifer              Dilly                      $3,600.00
1837                 Kelly                 Chung                      $3,800.00
1838                 Britney               Everett                    $3,900.00
1839                 Sarah                 Bell                       $4,000.00
1840                 Alexis                Bull                       $4,100.00
1841                 Diana                 Lorentz                    $4,200.00
1842                 Nandita               Sarchand                   $4,200.00
1843                 Jennifer              Whalen                     $4,400.00
1844                 David                 Austin                     $4,800.00
1845                 Valli                 Pataballa                  $4,800.00
1846                 Kevin                 Mourgos                    $5,800.00
1847                 Bruce                 Ernst                      $6,000.00
1848                 Pat                   Fay                        $6,000.00
1849                 Sundita               Kumar                      $6,100.00
1850                 Charles               Johnson                    $6,200.00
1851                 Amit                  Banda                      $6,200.00
1852                 Sundar                Ande                       $6,400.00
1853                 Shanta                Vollman                    $6,500.00
1854                 Susan                 Mavris                     $6,500.00
1855                 David                 Lee                        $6,800.00
1856                 Luis                  Popp                       $6,900.00
1857                 Kimberely             Grant                      $7,000.00
1858                 Oliver                Tuvault                    $7,000.00
1859                 Sarath                Sewall                     $7,000.00
1860                 Mattea                Marvins                    $7,200.00
1861                 Elizabeth             Bates                      $7,300.00
1862                 William               Smith                      $7,400.00
1863                 Nanette               Cambrault                  $7,500.00
1864                 Louise                Doran                      $7,500.00
1865                 Ismael                Sciarra                    $7,700.00
1866                 Jose Manuel           Urman                      $7,800.00
1867                 Payam                 Kaufling                   $7,900.00
1868                 Matthew               Weiss                      $8,000.00
1869                 Christopher           Olsen                      $8,000.00
1870                 Lindsey               Smith                      $8,000.00
1871                 Adam                  Fripp                      $8,200.00
1872                 John                  Chen                       $8,200.00
1873                 William               Gietz                      $8,300.00
1874                 Jack                  Livingston                 $8,400.00
No more messages

PL/SQL procedure successfully completed.

Leave a Reply