Oracle Streams is an outstanding way to get data
from one server to another. It is NOTHING like the
advanced replication of old, which is why I call it
“outstanding” and not “horribly inefficient and
error-prone, not to mention annoying.”
The “Oracle
Streams” book has a complete step-by-step troubleshooting guide
for fixing problems in Streams propagation.If you
don’t know anything about Streams, you can catch up
by reading a
good book, use a
quick tip, or read
some documentation.
Recently I was asked how to implement a custom
INSERT handler for a streams setup. The client had
the following situation:
- A production table that housed important
data
- A history table that was inserted into every
time a record on the production table changed
- A remote database meant to hold history
The client wished to set up Streams as follows:
- On insert to the history table, attempt to
stream the row across to the target DB
- On successful apply to the target DB, remove
the row from the source DB history table
- Do not propagate deletes from the source to
the target
Thankfully, it was pretty easy. We used a
combination of a negative propagation rule for
deletes and an apply-side DML handler for inserts.
Let’s take a look.
First Step: Halt Propagation of Deletes
This was accomplished using the
ADD_TABLE_PROPAGATION_RULES procedure of the
DBMS_STREAMS_ADM package with a negative rule on
DELETE. A negative rule simply means that we want to
set inclusion to FALSE if the condition is TRUE.
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'streams_test.test_hist',
streams_name => 'prop_test',
source_queue_name => 'streamsadm.rep_capture_queue',
destination_queue_name => 'streamsadm.rep_dest_queue@targetdb',
include_dml => true,
include_ddl => true,
source_database => 'primarydb',
inclusion_rule => false,
and_condition => ':lcr.GET_COMMAND_TYPE() = ''DELETE''');
END;
/
As you can see, we have set up a rule for stream
prop_test, on table streams_test.test_hist. This
rule applies to rows coming from the
rep_capture_queue locally and going to the
rep_dest_queue on the target database. We say to
include DML, and include DDL; however, we also say
in the and_condition that if the LCR (Logical Change
Request) type is DELETE, to NOT include it (inclusion_rule
=> false).
That takes care of deletes. No deletes on that
table will propagate, though anything else will.
Create a Handler on the Target
The next step was to create a procedure on the
target database that would be used as an Apply
handler. This procedure is owned by streams_test,
the actual user being streamed.
create or replace procedure streams_test.del_orig_lcr (in_any in sys.anydata) is
lcr sys.lcr$_row_record;
lcrrow sys.lcr$_row_list;
rc pls_integer;
pkdata sys.anydata;
pkval number;
begin
rc := in_any.getobject(lcr);
lcrrow := lcr.GET_VALUES('NEW');
lcr.execute(true);
for i in 1..lcrrow.count
loop
if lcrrow(i).column_name = 'ID' THEN
pkdata := lcrrow(i).data;
rc := pkdata.getNumber(pkval);
delete from streams_test.test_hist@primarydb where id = pkval;
commit;
end if;
end loop;
end;
/
This little piece of code will accept an LCR
(Logical Change Request), and extract the values.
First, it executes the LCR on the apply side. If
that is successful, it finds the primary key column
(in this case ID), and deletes it from the remote
database. Note that in order to do the delete, you
will need a database link pointing back to the
source DB. When the delete is processed on the
source database, it wont propagate over to the
target because of the rule we made above. If the
insert to the target does not work, it will not
delete from the source. We have covered all our
bases to have a process that will ONLY delete the
original if the apply works properly.
Last Step: The DML Handler
The last step is to tell Oracle that it should do
something special when the apply process runs. To
accomplish this, we will set up an apply handler on
the target database (not the source):
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => 'streams_test.test_hist',
object_type => 'TABLE',
operation_name => 'INSERT',
error_handler => false,
user_procedure => 'streams_test.del_orig_lcr',
apply_database_link => null,
apply_name => null);
END;
/
As you can see, we tell the apply that if DML
comes in of type INSERT, to run the
streams_test.del_orig_lcr procedure. We’re rocking
and rolling now!
The Test - Finale
The following test was run on the primary
database:
SQL> select * from streams_test.test_hist;
no rows selected
SQL> insert into streams_test.test_hist values (2, 'Jeff', sysdate);
1 row created.
SQL> commit;
Commit complete.
SQL> select * from streams_test.test_hist;
no rows selected
SQL> insert into streams_test.test_hist values (2, 'Kristina', sysdate);
1 row created.
SQL> commit;
Commit complete.
SQL> select * from streams_test.test_hist;
no rows selected
SQL> select * from streams_test.test_hist@targetdb;
ID CHARFIELD THEDATE
---------- -------------------------------------------------- ---------
1 Steve 10-NOV-06
2 Kristina 10-NOV-06
SQL> insert into streams_test.test_hist values (2, 'Duplicate', sysdate);
1 row created.
SQL> commit;
Commit complete.
SQL> select * from streams_test.test_hist;
ID CHARFIELD THEDATE
---------- -------------------------------------------------- ---------
2 Duplicate 10-NOV-06
SQL> select * from streams_test.test_hist@targetdb;
ID CHARFIELD THEDATE
---------- -------------------------------------------------- ---------
1 Steve 10-NOV-06
2 Kristina 10-NOV-06
Conclusion
We inserted two records on the primary (source),
and both of them propagated across to the target and
deleted on the source side. However, when we tried
to force an error with a duplicate entry for the PK
on the ID column, notice that the row stayed in the
table on the primary. This way, we are always
guaranteed to keep our data, even if there is a
streams error or some other kind of issue.