AQ Basics


A co-worker of mine recently had a dilemma. He needed to have a scheduler job chain in one schema trigger a second scheduler job chain in another schema when it completed. He had already come up with some ideas  on how to accomplish this with a variety of home-grown solutions and he wanted to run them past me to see which I thought would work the best. Once I realized his requirement, however, I steered him in another direction entirely: Oracle Advanced Queuing.

Advanced Queuing (AQ) is a vastly underused (in my experience) feature of Oracle database that allows stored procedures, programs, jobs – essentially anything that speaks SQL or PL/SQL – to send messages to other procedures, programs, and jobs. In this case it is possible to configure an Oracle Scheduler job or job chain that is listens for a specific message in the queue and starts automatically when the message is received. I created the following script for him to use as a template when configuring his own queue, and thought I would share it as a look into the basic operations of AQ.

In my example there are two schemas: USER1, which owns the first Scheduler Job, and USER2, which owns the second Scheduler Job that is triggered by USER1.

/* 
   Step 1: Drop the Queue and the Queue Table 
*/

begin
   sys.dbms_aqadm.stop_queue (queue_name => 'USER2.JOB_MESSAGE_Q');
   sys.dbms_aqadm.drop_queue (queue_name => 'USER2.JOB_MESSAGE_Q'); 
end;
/

begin
   sys.dbms_aqadm.drop_queue_table (
      queue_table   => 'USER2.JOB_MESSAGE_QT');
end;
/

/* 
   Step 2: Drop and recreate the Type for the message 
*/

drop type user2.sync_message_type;

create or replace type user2.sync_message_type as object
 (
   message varchar2 (20)
 );
/

/* 
   Step 3: Create the Queue and the Queue Table 
*/

begin
    sys.dbms_aqadm.create_queue_table (
       queue_table          => 'USER2.JOB_MESSAGE_QT',
       queue_payload_type   => 'USER2.JOB_MESSAGE_TYPE',
       compatible           => '10.0.0',
       storage_clause       => 'TABLESPACE USERS
                                RESULT_CACHE (MODE DEFAULT)
                               PCTUSED    0
                                PCTFREE    10
                                INITRANS   1
                                MAXTRANS   255
                                STORAGE    (
                                INITIAL          64K
                                NEXT             1M
                                MAXSIZE          UNLIMITED
                                MINEXTENTS       1
                                MAXEXTENTS       UNLIMITED
                                PCTINCREASE      0
                                BUFFER_POOL      DEFAULT
                                FLASH_CACHE      DEFAULT
                                CELL_FLASH_CACHE DEFAULT
                                )',
       sort_list            => 'ENQ_TIME',
      multiple_consumers   => true,
      message_grouping     => 0,
      secure               => false);
end;
/

begin
   sys.dbms_aqadm.create_queue (
      queue_name       => 'USER2.JOB_MESSAGE_Q',
      queue_table      => 'USER2.JOB_MESSAGE_QT',
      queue_type       => sys.dbms_aqadm.normal_queue,
      max_retries      => 5,
      retry_delay      => 0,
      retention_time   => 0);
end;
/

begin
   sys.dbms_aqadm.start_queue (queue_name   => 'USER2.JOB_MESSAGE_Q',
      enqueue      => true,
      dequeue      => true);
end;
/

/* 
   Step 4: Create views for job status checking - may need select on 
           the dba_scheduler views 
*/

drop view user2.api_scheduler_jobs;

create or replace force view user2.api_scheduler_jobs
 (
   job_name,
   enabled,
   state,
   last_start_date,
   last_run_duration,
   next_run_date
 )
 as
 select 
   job_name,
   enabled,
   state,
   last_start_date,
   last_run_duration,
   next_run_date
 from dba_scheduler_jobs
 where owner = 'USER2';

drop view user2.api_scheduler_job_run_details;

create or replace force view user2.api_scheduler_job_run_details
 (
   log_id,
   log_date,
   job_name,
   job_subname,
   status,
   error#,
   actual_start_date,
   run_duration,
   additional_info
 )
 as
 select 
   log_id,
   log_date,
   job_name,
   job_subname,
   status,
   error#,
   actual_start_date,
   run_duration,
   additional_info
 from dba_scheduler_job_run_details
 where owner = 'USER2';
 
/* 
   Step 5: Create a simple API to place messages in the queue 
*/

create or replace package user2.job_api_pkg
 as
    procedure job_run_cmd (p_command in varchar2);
end job_api_pkg;
/

create or replace package body user2.job_api_pkg
 as
    jobs_are_running       exception;
    jobs_are_running_nbr   number := -20101;
    jobs_are_running_msg   varchar2 (100)
         := 'Job is currently running - try again later.';

    function get_running_jobs (p_job_name in varchar2)
    return number
    is
       running_jobs   number;
    begin
       running_jobs := 0;

       select count (*)
         into running_jobs
         from user_scheduler_jobs
        where state like '%RUN%'
          and upper(job_name) = upper(p_job_name);

       return running_jobs;
    end get_running_jobs;

    procedure data_run_cmd (p_command in varchar2)
    is
       my_msgid   raw (16);
       props      dbms_aq.message_properties_t;
       enqopts    dbms_aq.enqueue_options_t;
    begin
       if get_running_jobs > 0
       then
          raise jobs_are_running;
       end if;

       begin
          sys.dbms_aq.enqueue ('user2.sync_message_q',
             enqopts,
             props,
             job_message_type (upper (p_command)),
             my_msgid);
       end;

       commit;
    exception
       when jobs_are_running
       then
          raise_application_error (jobs_are_running_nbr,
             jobs_are_running_msg);
       when others
       then
          raise;
    end job_run_cmd;
end job_api_pkg;
/

/* 
   Step 6: Create the scheduled job - have the job keyed to a specific 
           message in the queue 
*/

begin
   sys.dbms_scheduler.drop_job (
   job_name   => 'USER2.SAMPLE_JOB');
end;
/

begin
   sys.dbms_scheduler.create_job (
   job_name          => 'USER2.SAMPLE_JOB',
   start_date        => null,
   event_condition   => 'tab.user_data.message = ''SAMPLE_JOB''',
   queue_spec        => 'USER2.SYNC_MESSAGE_Q',
   end_date          => null,
   program_name      => 'USER2.SAMPLE_PROGRAM',
   comments          => null);
   sys.dbms_scheduler.set_attribute (
   name        => 'USER2.SAMPLE_JOB',
   attribute   => 'RESTARTABLE',
   value       => false);
   sys.dbms_scheduler.set_attribute (
   name        => 'USER2.SAMPLE_JOB',
   attribute   => 'LOGGING_LEVEL',
   value       => sys.dbms_scheduler.logging_runs);
   sys.dbms_scheduler.set_attribute_null (
   name        => 'USER2.SAMPLE_JOB',
   attribute   => 'MAX_FAILURES');
   sys.dbms_scheduler.set_attribute_null (
   name        => 'USER2.SAMPLE_JOB',
   attribute   => 'MAX_RUNS');
   sys.dbms_scheduler.set_attribute (
   name        => 'USER2.SAMPLE_JOB',
   attribute   => 'STOP_ON_WINDOW_CLOSE',
   value       => false);
   sys.dbms_scheduler.set_attribute (
   name        => 'USER2.SAMPLE_JOB',
   attribute   => 'JOB_PRIORITY',
   value       => 3);
   sys.dbms_scheduler.set_attribute_null (
   name        => 'USER2.SAMPLE_JOB',
   attribute   => 'SCHEDULE_LIMIT');
   sys.dbms_scheduler.set_attribute (
   name        => 'USER2.SAMPLE_JOB',
   attribute   => 'AUTO_DROP',
   value       => false);
   sys.dbms_scheduler.set_attribute (
   name        => 'USER2.SAMPLE_JOB',
   attribute   => 'RAISE_EVENTS',
   value       =>   sys.dbms_scheduler.job_succeeded
                  + sys.dbms_scheduler.job_failed
                  + sys.dbms_scheduler.job_broken
                  + sys.dbms_scheduler.job_completed
                  + sys.dbms_scheduler.job_sch_lim_reached
                  + sys.dbms_scheduler.job_chain_stalled);
   sys.dbms_scheduler.enable (name => 'USER2.SAMPLE_JOB');
end;
/
/* 
   Step 7: grant execute on the package to any schema that needs to 
           call a job 
*/

grant execute on USER2.job_api_pkg to USER1;
 
/* 
   Step 8: This command will place the call in the queue and execute 
           the job. It can be executed as part of a job chain, as a 
           dynamic action from an APEX app, etc. 
*/

execute USER2.job_api_pkg.data_run_cmd('SAMPLE_JOB');

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.