Before this post it will be helpful to read Oracle CDC (Change Data Capture) with Materialized View Log and Oracle Synchronous CDC (Change Data Capture).
Lets start with creating table below.
CREATE TABLE customer (
cust_id NUMBER,
cust_name VARCHAR2(50),
cust_type NUMBER,
birth_date DATE
);
select * from change_sets
begin
dbms_cdc_publish.create_change_set(
change_set_name => 'HOTLOG_SET',
description => 'HOTLOG CHANGE SET',
change_source_name => 'HOTLOG_SOURCE'
);
end;
BEGIN
DBMS_LOGMNR_CDC_PUBLISH.alter_change_set(change_set_name => 'HOTLOG_SET', enable_capture => 'Y',
stop_on_ddl => 'Y');
END;
This code creates a change table for customer table. Change table stores all dml statements that executed on main table.
BEGIN
DBMS_LOGMNR_CDC_PUBLISH.create_change_table
(owner => sys_context('USERENV','SESSION_USER') ,
source_schema => sys_context('USERENV','SESSION_SCHEMA') ,
change_set_name => 'HOTLOG_SET',
capture_values => 'both',
rs_id => 'y',
row_id => 'n',
user_id => 'y',
TIMESTAMP => 'y',
object_id => 'n',
source_colmap => 'n',
target_colmap => 'n',
options_string => NULL,
source_table => 'CUSTOMER',
change_table_name => 'CDC_CUSTOMER',
column_type_list => 'CUST_ID NUMBER,CUST_NAME VARCHAR2(50),CUST_TYPE NUMBER,BIRTH_DATE DATE'
);
END;
Lets execute some dml statements on customer table.
INSERT INTO customer(cust_id, cust_name, cust_type, birth_date)
VALUES (1,'John', 1, TO_DATE ('25/02/1978', 'DD/MM/YYYY'))
INSERT INTO customer(cust_id, cust_name, cust_type, birth_date)
VALUES (2,'Gabriel', 5, TO_DATE ('25/02/1985', 'DD/MM/YYYY'))
UPDATE customer
SET cust_type = 7,
birth_date = TO_DATE ('25/02/1982', 'DD/MM/YYYY')
WHERE cust_id = 1
DELETE FROM customer
WHERE cust_id = 2
We can easily see changing records in cdc table.
select * from customer
select * from cdc_customer
We also need to create a subscription for this change table.
declare
vSubhandle NUMBER;
v_subscription_description VARCHAR2(1000):= 'SUB_CUSTOMER';
col_names VARCHAR2(32767);
BEGIN
DBMS_CDC_SUBSCRIBE.CREATE_SUBSCRIPTION(CHANGE_SET_NAME => 'HOTLOG_SET',
DESCRIPTION => v_subscription_description, SUBSCRIPTION_NAME => v_subscription_description);
SELECT handle INTO vSubhandle FROM all_subscriptions
WHERE description = v_subscription_description;
col_names := 'CUST_ID NUMBER,CUST_NAME VARCHAR2(50),CUST_TYPE NUMBER,BIRTH_DATE DATE';
DBMS_LOGMNR_CDC_SUBSCRIBE.SUBSCRIBE (vSubhandle
, sys_context('USERENV','SESSION_SCHEMA'), 'CUSTOMER', col_names);
DBMS_CDC_SUBSCRIBE.ACTIVATE_SUBSCRIPTION(vSubhandle);
END;
We can view subscription in all_subscriptions table.
SELECT handle, set_name, created, status, earliest_scn, latest_scn,
last_purged, last_extended
FROM all_subscriptions
We have to extend our subscription to capture change data . After extending we can retrieve records from cdc_customer table that have cscn$ between earliest_scn and latest_scn. These change set can be executed on destination database. After a successful execution we should purge the subscription.
Extending subscription
declare
vSubhandle NUMBER;
v_subscription_description VARCHAR2(1000):= 'SUB_CUSTOMER';
BEGIN
SELECT handle INTO vSubhandle FROM all_subscriptions
WHERE description = v_subscription_description;
-- Extend the window for subscription
DBMS_CDC_SUBSCRIBE.EXTEND_WINDOW(SUBSCRIPTION_HANDLE=>vSubhandle);
END;
Purging subscription
declare
vSubhandle NUMBER;
v_subscription_description VARCHAR2(1000):= 'SUB_CUSTOMER';
BEGIN
vSubhandle :=0;
SELECT handle INTO vSubhandle FROM all_subscriptions
WHERE description = v_subscription_description;
DBMS_CDC_SUBSCRIBE.PURGE_WINDOW(
SUBSCRIPTION_HANDLE=> vSubhandle);
END;
Monday, March 1, 2010
Tuesday, February 9, 2010
Oracle Synchronous CDC (Change Data Capture)
Datawarehouses are designed to generate analytical reports and analysis. We can ofcourse do these analysis and reports on oltp system but this can decrease operational system performance. Usually end-of-day data is enough for analysis and analytical reports. So it is enough to load end-of-day data to datawarehouse. To load datawahouse it is not a good solution to retrieve all records from source tables to destination tables. And this can take much time. For example yo can't transfer 15 million data from source to destination table every day. It takes time and is also unnnecassary. A better way than transporting table data is extracting data that has changed and processing these changes to destination tables. Oracle Synchronous CDC (Change Data Capture) is one of data extraction methods.
Don't forget that companies usually has more than one operational system. That means you had to extract data from multiple oltp systems to your datawarehouse.
You may also want to look at CDC (Change Data Capture) with Materialized View Log.
Lets start with creating table below.
CREATE TABLE customer (
cust_id NUMBER,
cust_name VARCHAR2(50),
cust_type NUMBER,
birth_date DATE
);
This code creates a change table for customer table. Change table stores all dml statements that executed on main table.
BEGIN
DBMS_LOGMNR_CDC_PUBLISH.create_change_table
(owner => sys_context('USERENV','SESSION_USER') ,
source_schema => sys_context('USERENV','SESSION_SCHEMA') ,
change_set_name => 'SYNC_SET',
capture_values => 'both',
rs_id => 'y',
row_id => 'n',
user_id => 'y',
TIMESTAMP => 'y',
object_id => 'n',
source_colmap => 'n',
target_colmap => 'n',
options_string => NULL,
source_table => 'CUSTOMER',
change_table_name => 'CDC_CUSTOMER',
column_type_list => 'CUST_ID NUMBER,CUST_NAME VARCHAR2(50),CUST_TYPE NUMBER,BIRTH_DATE DATE'
);
END;
Lets execute some dml statements on customer table.
INSERT INTO customer(cust_id, cust_name, cust_type, birth_date)
VALUES (1,'John', 1, TO_DATE ('25/02/1978', 'DD/MM/YYYY'))
INSERT INTO customer(cust_id, cust_name, cust_type, birth_date)
VALUES (2,'Gabriel', 5, TO_DATE ('25/02/1985', 'DD/MM/YYYY'))
UPDATE customer
SET cust_type = 7,
birth_date = TO_DATE ('25/02/1982', 'DD/MM/YYYY')
WHERE cust_id = 1
DELETE FROM customer
WHERE cust_id = 2
We can easily see changing records in cdc table.
select * from customer
select * from cdc_customer
We also need to create a subscription for this change table.
declare
vSubhandle NUMBER;
v_subscription_description VARCHAR2(1000):= 'SUB_CUSTOMER';
col_names VARCHAR2(32767);
BEGIN
DBMS_CDC_SUBSCRIBE.CREATE_SUBSCRIPTION(CHANGE_SET_NAME => 'SYNC_SET',
DESCRIPTION => v_subscription_description, SUBSCRIPTION_NAME => v_subscription_description);
SELECT handle INTO vSubhandle FROM all_subscriptions
WHERE description = v_subscription_description;
col_names := 'CUST_ID NUMBER,CUST_NAME VARCHAR2(50),CUST_TYPE NUMBER,BIRTH_DATE DATE';
DBMS_LOGMNR_CDC_SUBSCRIBE.SUBSCRIBE (vSubhandle
, sys_context('USERENV','SESSION_SCHEMA'), 'CUSTOMER', col_names);
DBMS_CDC_SUBSCRIBE.ACTIVATE_SUBSCRIPTION(vSubhandle);
END;
We can view subscription in all_subscriptions table.
SELECT handle, set_name, created, status, earliest_scn, latest_scn,
last_purged, last_extended
FROM all_subscriptions
We have to extend our subscription to capture change data . After extending we can retrieve records from cdc_customer table that have cscn$ between earliest_scn and latest_scn. These change set can be executed on destination database. After a successful execution we should purge the subscription.
Extending subscription
declare
vSubhandle NUMBER;
v_subscription_description VARCHAR2(1000):= 'SUB_CUSTOMER';
BEGIN
SELECT handle INTO vSubhandle FROM all_subscriptions
WHERE description = v_subscription_description;
-- Extend the window for subscription
DBMS_CDC_SUBSCRIBE.EXTEND_WINDOW(SUBSCRIPTION_HANDLE=>vSubhandle);
END;
Purging subscription
declare
vSubhandle NUMBER;
v_subscription_description VARCHAR2(1000):= 'SUB_CUSTOMER';
BEGIN
vSubhandle :=0;
SELECT handle INTO vSubhandle FROM all_subscriptions
WHERE description = v_subscription_description;
DBMS_CDC_SUBSCRIBE.PURGE_WINDOW(
SUBSCRIPTION_HANDLE=> vSubhandle);
END;
Monday, February 8, 2010
Oracle CDC (Change Data Capture) with Materialized View Log
With materialized view log we can capture changed data in a table. Below there is simple implementation of this.
Assume that we have table that has structure below.
CREATE TABLE customer (
cust_id NUMBER,
cust_name VARCHAR2(50),
cust_type NUMBER,
birth_date DATE
);
First we should create materialized view log on this table.
CREATE MATERIALIZED VIEW LOG ON customer WITH
ROWID, SEQUENCE (cust_id, cust_name, cust_type, birth_date)
INCLUDING NEW VALUES
After this a table named MLOG$_customer will be automatically created. And every dml statement on customer table will be captured in materialized log table.
Lets execute som dml statements.
INSERT INTO customer(cust_id, cust_name, cust_type, birth_date)
VALUES (1,'John', 1, TO_DATE ('25/02/1978', 'DD/MM/YYYY'))
INSERT INTO customer(cust_id, cust_name, cust_type, birth_date)
VALUES (2,'Gabriel', 5, TO_DATE ('25/02/1985', 'DD/MM/YYYY'))
UPDATE customer
SET cust_type = 7,
birth_date = TO_DATE ('25/02/1982', 'DD/MM/YYYY')
WHERE cust_id = 1
DELETE FROM customer
WHERE cust_id = 2
We can easily see changing records in materialized view log table by querying tables below.
select * from customer
select * from MLOG$_customer
And finally with writing the below statement we can determine dml statements according to execution order.
SELECT a1.sequence$$, a1.dmltype$$, a1.cust_id, a1.cust_name, a1.cust_type,
a1.birth_date, a2.cust_id cust_id_n, a2.cust_name cust_name_n,
a2.cust_type cust_type_n, a2.birth_date birth_date_n
FROM mlog$_customer a1, mlog$_customer a2
WHERE a1.dmltype$$ = 'U'
AND a1.old_new$$ = 'U'
AND a2.dmltype$$ = 'U'
AND a2.old_new$$ = 'N'
AND a1.m_row$$ = a2.m_row$$
AND a1.sequence$$ + 1 = a2.sequence$$
UNION
SELECT a1.sequence$$, a1.dmltype$$, a1.cust_id, a1.cust_name, a1.cust_type,
a1.birth_date, NULL, NULL, NULL, NULL
FROM mlog$_customer a1
WHERE dmltype$$ = 'I'
UNION
SELECT a1.sequence$$, a1.dmltype$$, a1.cust_id, a1.cust_name, a1.cust_type,
a1.birth_date, NULL, NULL, NULL, NULL
FROM mlog$_customer a1
WHERE dmltype$$ = 'D'
ORDER BY sequence$$
You can use this method to transport data from an oltp system to a staging area(Datawarehouse).
Materealized view log uses triggers to feed log table.
Assume that we have table that has structure below.
CREATE TABLE customer (
cust_id NUMBER,
cust_name VARCHAR2(50),
cust_type NUMBER,
birth_date DATE
);
First we should create materialized view log on this table.
CREATE MATERIALIZED VIEW LOG ON customer WITH
ROWID, SEQUENCE (cust_id, cust_name, cust_type, birth_date)
INCLUDING NEW VALUES
After this a table named MLOG$_customer will be automatically created. And every dml statement on customer table will be captured in materialized log table.
Lets execute som dml statements.
INSERT INTO customer(cust_id, cust_name, cust_type, birth_date)
VALUES (1,'John', 1, TO_DATE ('25/02/1978', 'DD/MM/YYYY'))
INSERT INTO customer(cust_id, cust_name, cust_type, birth_date)
VALUES (2,'Gabriel', 5, TO_DATE ('25/02/1985', 'DD/MM/YYYY'))
UPDATE customer
SET cust_type = 7,
birth_date = TO_DATE ('25/02/1982', 'DD/MM/YYYY')
WHERE cust_id = 1
DELETE FROM customer
WHERE cust_id = 2
We can easily see changing records in materialized view log table by querying tables below.
select * from customer
select * from MLOG$_customer
And finally with writing the below statement we can determine dml statements according to execution order.
SELECT a1.sequence$$, a1.dmltype$$, a1.cust_id, a1.cust_name, a1.cust_type,
a1.birth_date, a2.cust_id cust_id_n, a2.cust_name cust_name_n,
a2.cust_type cust_type_n, a2.birth_date birth_date_n
FROM mlog$_customer a1, mlog$_customer a2
WHERE a1.dmltype$$ = 'U'
AND a1.old_new$$ = 'U'
AND a2.dmltype$$ = 'U'
AND a2.old_new$$ = 'N'
AND a1.m_row$$ = a2.m_row$$
AND a1.sequence$$ + 1 = a2.sequence$$
UNION
SELECT a1.sequence$$, a1.dmltype$$, a1.cust_id, a1.cust_name, a1.cust_type,
a1.birth_date, NULL, NULL, NULL, NULL
FROM mlog$_customer a1
WHERE dmltype$$ = 'I'
UNION
SELECT a1.sequence$$, a1.dmltype$$, a1.cust_id, a1.cust_name, a1.cust_type,
a1.birth_date, NULL, NULL, NULL, NULL
FROM mlog$_customer a1
WHERE dmltype$$ = 'D'
ORDER BY sequence$$
You can use this method to transport data from an oltp system to a staging area(Datawarehouse).
Materealized view log uses triggers to feed log table.
Monday, December 14, 2009
How to generate insert statements of a table in Oracle?
This code snippet takes one parameter as table name. Then generates insert statements of current data in this table. DBMS_SQL package is used to detect column types and retrieve data from table. You may want to look at "Is it possible to return table from oracle function?" post before you examine this code snippet. This code works on Oracle databases which have version 10g or higher.
PACKAGE PKG_UTIL2 IS
TYPE record_type IS RECORD (strSql varchar2(8192));
TYPE tbl_record_type IS TABLE OF record_type;
FUNCTION f_get_tbl_insert_dml(p_table_name in varchar2
) RETURN tbl_record_type PIPELINED;
END; -- Package spec
TYPE record_type IS RECORD (strSql varchar2(8192));
TYPE tbl_record_type IS TABLE OF record_type;
FUNCTION f_get_tbl_insert_dml(p_table_name in varchar2
) RETURN tbl_record_type PIPELINED;
END; -- Package spec
PACKAGE BODY PKG_UTIL2 IS
function f_get_tbl_insert_dml(p_table_name in varchar2
) return tbl_record_type pipelined as
out_rec record_type;
str_sql varchar2(8192);
desc_tab dbms_sql.desc_tab;
col_cnt number;
cur_id NUMBER;
namevar VARCHAR2(32767);
numvar NUMBER;
datevar DATE;
opr varchar2(4000);
ret_val varchar2(8192);
tbl_col_cnt number;
begin
str_sql := 'select * from '||p_table_name;
cur_id := dbms_sql.open_cursor;
dbms_sql.parse( c => cur_id, statement => str_sql, language_flag => dbms_sql.native);
dbms_sql.describe_columns( c => cur_id, col_cnt => col_cnt, desc_t => desc_tab);
FOR i IN 1 .. col_cnt LOOP
IF desc_tab(i).col_type = 2 THEN
DBMS_SQL.DEFINE_COLUMN(cur_id, i, numvar);
ELSIF desc_tab(i).col_type = 12 THEN
DBMS_SQL.DEFINE_COLUMN(cur_id, i, datevar);
ELSE
DBMS_SQL.DEFINE_COLUMN(cur_id, i, namevar, 4000);
END IF;
END LOOP;
if DBMS_SQL.execute(cur_id) = 0 then
while DBMS_SQL.FETCH_ROWS(cur_id) > 0 loop
ret_val := 'INSERT INTO ' || p_table_name || ' VALUES (';
for i in 1..col_cnt loop
if desc_tab(i).col_type = 2 then --number
DBMS_SQL.COLUMN_VALUE(cur_id, i, numvar);
if numvar is null then
ret_val := ret_val || 'null,';
else
ret_val := ret_val || replace(to_char(numvar),',','.') || ',';
end if;
elsif desc_tab(i).col_type = 12 then --date
DBMS_SQL.COLUMN_VALUE(cur_id, i, datevar);
if datevar is null then
ret_val := ret_val || 'null,';
else
if datevar = trunc(datevar) then
ret_val := ret_val || 'to_date('''
|| to_char(datevar, 'DD/MM/YYYY') ||''',''DD/MM/YYYY''),';
else
ret_val := ret_val || 'to_date('''
|| to_char(datevar, 'DD/MM/YYYY hh24:mi:ss') ||''',''DD/MM/YYYY hh24:mi:ss''),';
end if;
end if;
else --varchar2, char, others
DBMS_SQL.COLUMN_VALUE(cur_id, i, namevar);
if namevar is null then
ret_val := ret_val || 'null,';
else
ret_val := ret_val || '''' || namevar || ''',';
end if;
end if;
end loop;
ret_val := substr(ret_val,1,length(ret_val)-1) || ')';
out_rec.strSql := ret_val;
pipe row (out_rec);
end loop;
end if;
DBMS_SQL.CLOSE_CURSOR(cur_id);
return;
end;
END;
function f_get_tbl_insert_dml(p_table_name in varchar2
) return tbl_record_type pipelined as
out_rec record_type;
str_sql varchar2(8192);
desc_tab dbms_sql.desc_tab;
col_cnt number;
cur_id NUMBER;
namevar VARCHAR2(32767);
numvar NUMBER;
datevar DATE;
opr varchar2(4000);
ret_val varchar2(8192);
tbl_col_cnt number;
begin
str_sql := 'select * from '||p_table_name;
cur_id := dbms_sql.open_cursor;
dbms_sql.parse( c => cur_id, statement => str_sql, language_flag => dbms_sql.native);
dbms_sql.describe_columns( c => cur_id, col_cnt => col_cnt, desc_t => desc_tab);
FOR i IN 1 .. col_cnt LOOP
IF desc_tab(i).col_type = 2 THEN
DBMS_SQL.DEFINE_COLUMN(cur_id, i, numvar);
ELSIF desc_tab(i).col_type = 12 THEN
DBMS_SQL.DEFINE_COLUMN(cur_id, i, datevar);
ELSE
DBMS_SQL.DEFINE_COLUMN(cur_id, i, namevar, 4000);
END IF;
END LOOP;
if DBMS_SQL.execute(cur_id) = 0 then
while DBMS_SQL.FETCH_ROWS(cur_id) > 0 loop
ret_val := 'INSERT INTO ' || p_table_name || ' VALUES (';
for i in 1..col_cnt loop
if desc_tab(i).col_type = 2 then --number
DBMS_SQL.COLUMN_VALUE(cur_id, i, numvar);
if numvar is null then
ret_val := ret_val || 'null,';
else
ret_val := ret_val || replace(to_char(numvar),',','.') || ',';
end if;
elsif desc_tab(i).col_type = 12 then --date
DBMS_SQL.COLUMN_VALUE(cur_id, i, datevar);
if datevar is null then
ret_val := ret_val || 'null,';
else
if datevar = trunc(datevar) then
ret_val := ret_val || 'to_date('''
|| to_char(datevar, 'DD/MM/YYYY') ||''',''DD/MM/YYYY''),';
else
ret_val := ret_val || 'to_date('''
|| to_char(datevar, 'DD/MM/YYYY hh24:mi:ss') ||''',''DD/MM/YYYY hh24:mi:ss''),';
end if;
end if;
else --varchar2, char, others
DBMS_SQL.COLUMN_VALUE(cur_id, i, namevar);
if namevar is null then
ret_val := ret_val || 'null,';
else
ret_val := ret_val || '''' || namevar || ''',';
end if;
end if;
end loop;
ret_val := substr(ret_val,1,length(ret_val)-1) || ')';
out_rec.strSql := ret_val;
pipe row (out_rec);
end loop;
end if;
DBMS_SQL.CLOSE_CURSOR(cur_id);
return;
end;
END;
Usage
Tuesday, December 8, 2009
Is it possible to return table from oracle function?
In Oracle 9i and previous versions you could return table from a function by creating a type and a function which returns that type as pipelined function. In Oracle databases which have lower versions than 10g we should create that object on database. And it was a bit hard when you want to change the return structure.
We had to drop that type and recreate it. In Oracle 10g and higher verions we can return table from a function by no creating any type on database. We could do this by declaring a record in package spec. The code below works on Oracle databases which have version is 10g or higher.
PACKAGE PKG_PIPELINE IS
TYPE record_type IS RECORD (
string varchar2(8192),
nmbr number,
dt date
);
TYPE tbl_record_type IS TABLE OF record_type;
function f_test(cnt in number) RETURN tbl_record_type PIPELINED;
END; -- Package spec
function f_test(cnt in number) RETURN tbl_record_type PIPELINED as
PRAGMA AUTONOMOUS_TRANSACTION;
out_rec record_type;
begin
out_rec.string := 'A';
out_rec.nmbr := 5;
out_rec.dt := sysdate;
pipe row (out_rec);
for i in 1..2*cnt loop
out_rec.string := 'B';
out_rec.nmbr := i;
out_rec.dt := sysdate-1;
pipe row (out_rec);
end loop;
return;
end;
END;
Usage
select * from table(PKG_PIPELINE.f_test(3))
Subscribe to:
Posts (Atom)