Conditional Job Runs Using Oracle
Advanced Queuing
In this method, all tasks in the chain are
scheduled as regular repeating jobs. When a task completes
successfully, it places a message on a queue for the next task to
read. With the exception of the first task, the first
operation a task performs is read from its queue. If there is
a message on the queue, the task can proceed; otherwise, it waits
indefinitely for the message to arrive.
Before any code can be written, a queuing
infrastructure needs to be set up using the job_chain_aq_setup.sql
script and background information must be introduced. A full
introduction to Oracle Advanced Queuing is beyond the scope of this
book, so explanations will be limited to just those elements
necessary to build a simple working system.
*
job_chain_aq_setup.sql
-- Grant necessary permissions
conn sys/password as sysdba
-- Create the
queue payload
CREATE OR REPLACE TYPE job_user.job_chain_msg_type AS OBJECT (
message VARCHAR2(10)
)
/
-- Create the queue table and queues
BEGIN
DBMS_AQADM.create_queue_table (
queue_table
=> 'job_user.job_chain_queue_tab',
queue_payload_type => 'job_user.job_chain_msg_type');
DBMS_AQADM.create_queue (
queue_name
=> 'job_user.task_2_queue',
queue_table
=> 'job_user.job_chain_queue_tab');
DBMS_AQADM.create_queue (
queue_name
=> 'job_user.task_3_queue',
queue_table
=> 'job_user.job_chain_queue_tab');
DBMS_AQADM.start_queue (
queue_name => 'job_user.task_2_queue',
enqueue
=> TRUE);
DBMS_AQADM.start_queue (
queue_name
=> 'job_user.task_3_queue',
enqueue
=> TRUE);
END;
/
grant execute on dbms_aq to job_user;
conn job_user/job_user
Advanced Queuing (AQ) is Oracle�s
implementation of a messaging system which can be used as a
replacement for the dbms_pipe package and other bespoke solutions.
The basic unit of any messaging system is a message with the most
important element of the message being its contents, or payload.
In order to define a queue table, the payload of the messages that
will be stored within it must first be defined. The
job_chain_aq_setup.sql script contains a definition of an object
type called job_chain_msg_type that will act as the payload.
The creation of object types requires the CREATE TYPE privilege.
The payload of the message can be as simple or
complicated as desired. In this case, the only concern is that
the message has been sent. The particular contents are not
important at this time, so the message is extremely simple.
Administration of queues is done using the
dbms_aqadm package and requires the aq_administrator_role to be
granted to the administrator. Alternatively, all
administration can be performed by a privileged user such as SYS or
SYSTEM. With the payload object defined, the queue table is
created using the create_queue_table procedure.
Once the queue table has been created, the
individual queues are created and started using the create_queue and
start_queue procedures respectively. A single queue table can
hold many queues as long as each queue uses the same type for its
payload.
Messages are queued and de-queued using the
dbms_aq package. Access to this package can be granted using
the aq_user_role role. However, access to it from a stored procedure
is achieved by using the job_chain_aq_setup.sql script, which grants
the privilege on this object directly to the test user.
The contents of the queue table can be
monitored using the job_chain_aq_query.sql script.
*
job_chain_aq_query.sql
-- *************************************************
select
queue,
count(*) as messages
from
aq$job_chain_queue_taB
group by
queue
order by
queue
;
The point has been reach at which coding the specific example
is desired. The job_chain_aq.sql script creates a package
specification and body that will do all the work for the example job
chain.
*
job_chain_aq.sql
CREATE OR REPLACE PACKAGE job_chain_aq AS
PROCEDURE
task_1;
PROCEDURE task_2;
PROCEDURE task_3;
PROCEDURE enqueue_message (p_queue_name IN VARCHAR2);
PROCEDURE dequeue_message (p_queue_name IN VARCHAR2);
END job_chain_aq;
/
SHOW ERRORS
CREATE OR
REPLACE PACKAGE BODY job_chain_aq AS
--
-----------------------------------------------------------------
PROCEDURE task_1 AS
-- -----------------------------------------------------------------
BEGIN
DELETE
FROM job_chain;
INSERT
INTO job_chain (created_timestamp, task_name)
VALUES (systimestamp, 'TASK_1');
COMMIT;
--
Uncomment the following line to force a failure.
--RAISE_APPLICATION_ERROR(-20000,
-- 'This is a fake error to prevent task_2 being executed');
-- The
work has comleted successfully so signal task_2
enqueue_message (p_queue_name => 'task_2_queue');
EXCEPTION
WHEN OTHERS THEN
-- Don't signal task_2.
NULL;
END task_1;
-- -----------------------------------------------------------------
--
-----------------------------------------------------------------
PROCEDURE task_2 AS
-- -----------------------------------------------------------------
BEGIN
dequeue_message (p_queue_name => 'task_2_queue');
INSERT
INTO job_chain (created_timestamp, task_name)
VALUES (systimestamp, 'TASK_2');
COMMIT;
--
Uncomment the following line to force a failure.
--RAISE_APPLICATION_ERROR(-20000,
-- 'This is a fake error to prevent task_3 being executed');
-- The
work has comleted successfully so signal task_3
enqueue_message (p_queue_name => 'task_3_queue');
EXCEPTION
WHEN OTHERS THEN
-- Don't signal task_3.
NULL;
END task_2;
-- -----------------------------------------------------------------
--
-----------------------------------------------------------------
PROCEDURE task_3 AS
-- -----------------------------------------------------------------
BEGIN
dequeue_message (p_queue_name => 'task_3_queue');
INSERT
INTO job_chain (created_timestamp, task_name)
VALUES (systimestamp, 'TASK_3');
COMMIT;
END task_3;
-- -----------------------------------------------------------------
--
-----------------------------------------------------------------
PROCEDURE
enqueue_message (p_queue_name IN VARCHAR2) AS
-- -----------------------------------------------------------------
l_enqueue_options DBMS_AQ.enqueue_options_t;
l_message_properties DBMS_AQ.message_properties_t;
l_message_handle RAW(16);
l_job_chain_msg job_chain_msg_type;
BEGIN
l_job_chain_msg := job_chain_msg_type('GO');
DBMS_AQ.enqueue(queue_name
=> 'job_user.' || p_queue_name,
enqueue_options => l_enqueue_options,
message_properties => l_message_properties,
payload =>
l_job_chain_msg,
msgid
=> l_message_handle);
END enqueue_message;
-- -----------------------------------------------------------------
--
-----------------------------------------------------------------
PROCEDURE dequeue_message (p_queue_name IN VARCHAR2) AS
-- -----------------------------------------------------------------
l_dequeue_options DBMS_AQ.dequeue_options_t;
l_message_properties DBMS_AQ.message_properties_t;
l_message_handle RAW(16);
l_job_chain_msg job_chain_msg_type;
BEGIN
DBMS_AQ.dequeue(queue_name
=> 'job_user.' || p_queue_name,
dequeue_options => l_dequeue_options,
message_properties => l_message_properties,
payload
=> l_job_chain_msg,
msgid
=> l_message_handle);
END dequeue_message;
-- -----------------------------------------------------------------
END
job_chain_aq;
/
SHOW ERRORS
Next, the jobs associated with each task are
scheduled. Unlike the previous example, the job sequence is
protected by the queue, so all the jobs can be enabled.
*
job_chain_aq_jobs.sql
-- Oracle10g
BEGIN
DBMS_SCHEDULER.create_job (
job_name =>
'job_chain_aq_task_1',
job_type => 'STORED_PROCEDURE',
job_action =>
'job_chain_aq.task_1',
start_date => SYSTIMESTAMP,
repeat_interval => 'freq=daily; byhour=6; byminute=0;
bysecond=0;',
end_date => NULL,
enabled =>
TRUE,
comments => 'First
task in the AQ chain.');
END;
/
BEGIN
DBMS_SCHEDULER.create_job (
job_name =>
'job_chain_aq_task_2',
job_type => 'STORED_PROCEDURE',
job_action =>
'job_chain_aq.task_2',
start_date => SYSTIMESTAMP,
repeat_interval => 'freq=daily; byhour=12; byminute=0;
bysecond=0;',
end_date => NULL,
enabled =>
TRUE,
comments => 'Second
task in the AQ chain.');
END;
/
BEGIN
DBMS_SCHEDULER.create_job (
job_name =>
'job_chain_aq_task_3',
job_type => 'STORED_PROCEDURE',
job_action =>
'job_chain_aq.task_3',
start_date => SYSTIMESTAMP,
repeat_interval => 'freq=daily; byhour=18; byminute=0;
bysecond=0;',
end_date => NULL,
enabled =>
TRUE,
comments => 'Third
task in the AQ chain.');
END;
/
EXEC
DBMS_SCHEDULER.run_job ('job_chain_aq_task_1');
-- Pre Oracle10g
/*
BEGIN
DBMS_JOB.isubmit (
job => 1000,
what => 'BEGIN
job_chain_aq.task_1; END;',
next_date => SYSDATE,
interval => 'TRUNC(SYSDATE) + INTERVAL ''1 6'' DAY TO
HOUR');
DBMS_JOB.isubmit (
job => 1001,
what => 'BEGIN
job_chain_aq.task_2; END;',
next_date => SYSDATE,
interval => 'TRUNC(SYSDATE) + INTERVAL ''1 12'' DAY TO
HOUR');
DBMS_JOB.isubmit (
job => 1002,
what => 'BEGIN
job_chain_aq.task_3; END;',
next_date => SYSDATE,
interval => 'TRUNC(SYSDATE) + INTERVAL ''1 18'' DAY TO
HOUR');
COMMIT;
END;
/
*/
-- Cleanup
/*
-- Oracle10g
BEGIN
DBMS_SCHEDULER.drop_job ('job_chain_aq_task_3');
DBMS_SCHEDULER.drop_job ('job_chain_aq_task_2');
DBMS_SCHEDULER.drop_job ('job_chain_aq_task_1');
END;
/
-- Pre Oracle10g
BEGIN
DBMS_JOB.remove(1002);
DBMS_JOB.remove(1001);
DBMS_JOB.remove(1000);
COMMIT;
END;
/
*/
At this point, the tasks are scheduled but have
not been executed; therefore, there are no results in the job_chain
table or the job_chain_queue_tab table. Rather than waiting
until 6:00, the first job is forced to run immediately. The
results below show that the first task has run and there is a
message waiting in the queue table on the task_2_queue.
SQL> exec
dbms_scheduler.run_job ('job_chain_aq_task_1');
PL/SQL
procedure successfully completed.
job_user@db10g>
@job_chain_query.sql
CREATED_TIMESTAMP
TASK_NAME
--------------------------- --------------------
07-AUG-2004 18:18:36.136000 TASK_1
1 row
selected.
SQL> @job_chain_aq_query.sql
QUEUE
MESSAGES
------------------------------ ----------
TASK_2_QUEUE
1
1 row
selected.
If the run of the second job is forced; the
second task has read a message from its queue, completed its
processing and placed a message on the queue for the third task.
SQL> exec
dbms_scheduler.run_job ('job_chain_aq_task_2');
PL/SQL
procedure successfully completed.
SQL> @job_chain_query.sql
CREATED_TIMESTAMP
TASK_NAME
--------------------------- --------------------
07-AUG-2004 18:18:36.136000 TASK_1
07-AUG-2004 18:23:08.771000 TASK_2
2 rows
selected.
SQL> @job_chain_aq_query.sql
QUEUE
MESSAGES
------------------------------ ----------
TASK_3_QUEUE
1
1 row
selected.
If the run of the third job is forced; the
third task has read a message from its queue and completed its
processing.
SQL> exec
dbms_scheduler.run_job ('job_chain_aq_task_3');
PL/SQL
procedure successfully completed.
SQL> @job_chain_query.sql
CREATED_TIMESTAMP
TASK_NAME
--------------------------- --------------------
07-AUG-2004 18:18:36.136000 TASK_1
07-AUG-2004 18:23:08.771000 TASK_2
07-AUG-2004 18:26:04.972000 TASK_3
3 rows
selected.
SQL> @job_chain_aq_query.sql
no rows
selected
If manually attempting to start jobs out of
sequence; the sessions hang until the appropriate message is sent.
|
This is an excerpt from the book "Oracle
Job Scheduling" by Dr. Tim Hall. You can buy it direct
from the publisher for 30%-off and get instant access to the
code depot of Oracle job scheduling scripts. |