Monday, March 1, 2010

Oracle Asynchronous CDC (Change Data Capture)

Before this post it will be helpful to read Oracle CDC (Change Data Capture) with Materialized View Log and Oracle Synchronous CDC (Change Data Capture).

Lets start with creating table below.
CREATE TABLE customer (
    cust_id NUMBER,
    cust_name VARCHAR2(50),
    cust_type NUMBER,
    birth_date DATE
);
 

select * from change_sets





begin
    dbms_cdc_publish.create_change_set(
        change_set_name => 'HOTLOG_SET',
        description => 'HOTLOG CHANGE SET',
        change_source_name => 'HOTLOG_SOURCE'
    ); 
end;

BEGIN
    DBMS_LOGMNR_CDC_PUBLISH.alter_change_set(change_set_name => 'HOTLOG_SET', enable_capture => 'Y',
        stop_on_ddl     => 'Y');
END;


This code creates a  change table for customer table. Change table stores all dml statements that executed on main table.
BEGIN
   DBMS_LOGMNR_CDC_PUBLISH.create_change_table
                                           (owner                  => sys_context('USERENV','SESSION_USER') ,
                                            source_schema          => sys_context('USERENV','SESSION_SCHEMA') ,
                                            change_set_name        => 'HOTLOG_SET',
                                            capture_values         => 'both',
                                            rs_id                  => 'y',
                                            row_id                 => 'n',
                                            user_id                => 'y',
                                            TIMESTAMP              => 'y',
                                            object_id              => 'n',
                                            source_colmap          => 'n',
                                            target_colmap          => 'n',
                                            options_string         => NULL,
                                            source_table           => 'CUSTOMER',
                                            change_table_name      => 'CDC_CUSTOMER',
                                            column_type_list       => 'CUST_ID NUMBER,CUST_NAME VARCHAR2(50),CUST_TYPE NUMBER,BIRTH_DATE DATE'
                                           );
END;


Lets execute some dml statements on customer table.
INSERT INTO customer(cust_id, cust_name, cust_type, birth_date)
     VALUES (1,'John', 1, TO_DATE ('25/02/1978', 'DD/MM/YYYY'))

INSERT INTO customer(cust_id, cust_name, cust_type, birth_date)
     VALUES (2,'Gabriel', 5, TO_DATE ('25/02/1985', 'DD/MM/YYYY'))

UPDATE customer
   SET cust_type = 7,
       birth_date = TO_DATE ('25/02/1982', 'DD/MM/YYYY')
 WHERE cust_id = 1

DELETE FROM customer
      WHERE cust_id = 2

We can easily see changing records in cdc table.
select * from customer
select * from cdc_customer

We also need to create a subscription for this change table.

declare
    vSubhandle NUMBER;       
    v_subscription_description VARCHAR2(1000):= 'SUB_CUSTOMER';
    col_names VARCHAR2(32767);
   
BEGIN
    DBMS_CDC_SUBSCRIBE.CREATE_SUBSCRIPTION(CHANGE_SET_NAME => 'HOTLOG_SET',
             DESCRIPTION => v_subscription_description, SUBSCRIPTION_NAME => v_subscription_description);            
    SELECT handle INTO vSubhandle FROM all_subscriptions
        WHERE description = v_subscription_description;   
     
    col_names := 'CUST_ID NUMBER,CUST_NAME VARCHAR2(50),CUST_TYPE NUMBER,BIRTH_DATE DATE';
     
    DBMS_LOGMNR_CDC_SUBSCRIBE.SUBSCRIBE (vSubhandle
        , sys_context('USERENV','SESSION_SCHEMA'), 'CUSTOMER', col_names);
     
    DBMS_CDC_SUBSCRIBE.ACTIVATE_SUBSCRIPTION(vSubhandle);
END;


We can view subscription in all_subscriptions table.
SELECT handle, set_name, created, status, earliest_scn, latest_scn,
       last_purged, last_extended
  FROM all_subscriptions







We have to extend our subscription to capture change data . After extending we can retrieve records from cdc_customer table that have cscn$ between earliest_scn and latest_scn. These change set can be executed on destination database. After a successful execution we should purge the subscription.

Extending subscription
declare
    vSubhandle NUMBER;
    v_subscription_description VARCHAR2(1000):= 'SUB_CUSTOMER';
BEGIN
    SELECT handle INTO vSubhandle FROM all_subscriptions
        WHERE description = v_subscription_description;
    -- Extend the window for subscription
    DBMS_CDC_SUBSCRIBE.EXTEND_WINDOW(SUBSCRIPTION_HANDLE=>vSubhandle);
END;



Purging subscription
declare
    vSubhandle NUMBER;
    v_subscription_description VARCHAR2(1000):= 'SUB_CUSTOMER';
BEGIN
    vSubhandle :=0;
    SELECT handle INTO vSubhandle FROM all_subscriptions
        WHERE description = v_subscription_description;
    DBMS_CDC_SUBSCRIBE.PURGE_WINDOW(
        SUBSCRIPTION_HANDLE=> vSubhandle);
END;

Tuesday, February 9, 2010

Oracle Synchronous CDC (Change Data Capture)

 
Datawarehouses are designed to generate analytical reports and analysis. We can ofcourse do these analysis and reports on oltp system but this can decrease operational system performance. Usually end-of-day data is enough for analysis and analytical reports. So it is enough to load end-of-day data to datawarehouse. To load datawahouse it is not a good solution to retrieve all records from source tables to destination tables. And this can take much time. For example yo can't transfer 15 million data from source to destination table every day. It takes time and is also unnnecassary. A better way than transporting table data is extracting data that has changed and processing these changes to destination tables. Oracle Synchronous CDC (Change Data Capture) is one of data extraction methods.
Don't forget that companies usually has more than one operational system. That means you had to extract data from multiple oltp systems to your datawarehouse.
You may also want to look at CDC (Change Data Capture) with Materialized View Log.


Lets start with creating table below.
CREATE TABLE customer (
    cust_id NUMBER,
    cust_name VARCHAR2(50),
    cust_type NUMBER,
    birth_date DATE
);

This code creates a  change table for customer table. Change table stores all dml statements that executed on main table.
BEGIN
   DBMS_LOGMNR_CDC_PUBLISH.create_change_table
                                           (owner                  => sys_context('USERENV','SESSION_USER') ,
                                            source_schema          => sys_context('USERENV','SESSION_SCHEMA') ,
                                            change_set_name        => 'SYNC_SET',
                                            capture_values         => 'both',
                                            rs_id                  => 'y',
                                            row_id                 => 'n',
                                            user_id                => 'y',
                                            TIMESTAMP              => 'y',
                                            object_id              => 'n',
                                            source_colmap          => 'n',
                                            target_colmap          => 'n',
                                            options_string         => NULL,
                                            source_table           => 'CUSTOMER',
                                            change_table_name      => 'CDC_CUSTOMER',
                                            column_type_list       => 'CUST_ID NUMBER,CUST_NAME VARCHAR2(50),CUST_TYPE NUMBER,BIRTH_DATE DATE'
                                           );
END;

Lets execute some dml statements on customer table.
INSERT INTO customer(cust_id, cust_name, cust_type, birth_date)
     VALUES (1,'John', 1, TO_DATE ('25/02/1978', 'DD/MM/YYYY'))

INSERT INTO customer(cust_id, cust_name, cust_type, birth_date)
     VALUES (2,'Gabriel', 5, TO_DATE ('25/02/1985', 'DD/MM/YYYY'))

UPDATE customer
   SET cust_type = 7,
       birth_date = TO_DATE ('25/02/1982', 'DD/MM/YYYY')
 WHERE cust_id = 1

DELETE FROM customer
      WHERE cust_id = 2

We can easily see changing records in cdc table.
select * from customer
select * from cdc_customer
 
We also need to create a subscription for this change table.

declare
    vSubhandle NUMBER;       
    v_subscription_description VARCHAR2(1000):= 'SUB_CUSTOMER';
    col_names VARCHAR2(32767);
   
BEGIN
    DBMS_CDC_SUBSCRIBE.CREATE_SUBSCRIPTION(CHANGE_SET_NAME => 'SYNC_SET',
             DESCRIPTION => v_subscription_description, SUBSCRIPTION_NAME => v_subscription_description);            
    SELECT handle INTO vSubhandle FROM all_subscriptions
        WHERE description = v_subscription_description;   
     
    col_names := 'CUST_ID NUMBER,CUST_NAME VARCHAR2(50),CUST_TYPE NUMBER,BIRTH_DATE DATE';
     
    DBMS_LOGMNR_CDC_SUBSCRIBE.SUBSCRIBE (vSubhandle
        , sys_context('USERENV','SESSION_SCHEMA'), 'CUSTOMER', col_names);
     
    DBMS_CDC_SUBSCRIBE.ACTIVATE_SUBSCRIPTION(vSubhandle);
END;


We can view subscription in all_subscriptions table.
SELECT handle, set_name, created, status, earliest_scn, latest_scn,
       last_purged, last_extended
  FROM all_subscriptions
We have to extend our subscription to capture change data . After extending we can retrieve records from cdc_customer table that have cscn$ between earliest_scn and latest_scn. These change set can be executed on destination database. After a successful execution we should purge the subscription.

Extending subscription
declare
    vSubhandle NUMBER;
    v_subscription_description VARCHAR2(1000):= 'SUB_CUSTOMER';
BEGIN
    SELECT handle INTO vSubhandle FROM all_subscriptions
        WHERE description = v_subscription_description;
    -- Extend the window for subscription
    DBMS_CDC_SUBSCRIBE.EXTEND_WINDOW(SUBSCRIPTION_HANDLE=>vSubhandle);
END;



Purging subscription
declare
    vSubhandle NUMBER;
    v_subscription_description VARCHAR2(1000):= 'SUB_CUSTOMER';
BEGIN
    vSubhandle :=0;
    SELECT handle INTO vSubhandle FROM all_subscriptions
        WHERE description = v_subscription_description;
    DBMS_CDC_SUBSCRIBE.PURGE_WINDOW(
        SUBSCRIPTION_HANDLE=> vSubhandle);
END;

Monday, February 8, 2010

Oracle CDC (Change Data Capture) with Materialized View Log

With materialized view log we can capture changed data in a table. Below there is simple implementation of this.

Assume that we have table that has structure below.
CREATE TABLE customer (
    cust_id NUMBER,
    cust_name VARCHAR2(50),
    cust_type NUMBER,
    birth_date DATE
);

First we should create materialized view log on this table.
CREATE MATERIALIZED VIEW LOG ON customer WITH
    ROWID, SEQUENCE (cust_id, cust_name, cust_type, birth_date)
    INCLUDING NEW VALUES

After this a table named MLOG$_customer will be automatically created. And every dml statement on customer table will be captured in materialized log table.
Lets execute som dml statements.
INSERT INTO customer(cust_id, cust_name, cust_type, birth_date)
     VALUES (1,'John', 1, TO_DATE ('25/02/1978', 'DD/MM/YYYY'))

INSERT INTO customer(cust_id, cust_name, cust_type, birth_date)
     VALUES (2,'Gabriel', 5, TO_DATE ('25/02/1985', 'DD/MM/YYYY'))

UPDATE customer
   SET cust_type = 7,
       birth_date = TO_DATE ('25/02/1982', 'DD/MM/YYYY')
 WHERE cust_id = 1


DELETE FROM customer
      WHERE cust_id = 2

We can easily see changing records in materialized view log table by querying tables below.
select * from customer
select * from MLOG$_customer


And finally with writing the below statement we can determine dml statements according to execution order.
SELECT   a1.sequence$$, a1.dmltype$$, a1.cust_id, a1.cust_name, a1.cust_type,
         a1.birth_date, a2.cust_id cust_id_n, a2.cust_name cust_name_n,
         a2.cust_type cust_type_n, a2.birth_date birth_date_n
    FROM mlog$_customer a1, mlog$_customer a2
   WHERE a1.dmltype$$ = 'U'
     AND a1.old_new$$ = 'U'
     AND a2.dmltype$$ = 'U'
     AND a2.old_new$$ = 'N'
     AND a1.m_row$$ = a2.m_row$$
    AND a1.sequence$$ + 1 = a2.sequence$$
UNION
SELECT   a1.sequence$$, a1.dmltype$$, a1.cust_id, a1.cust_name, a1.cust_type,
         a1.birth_date, NULL, NULL, NULL, NULL
    FROM mlog$_customer a1
   WHERE dmltype$$ = 'I'
UNION
SELECT   a1.sequence$$, a1.dmltype$$, a1.cust_id, a1.cust_name, a1.cust_type,
         a1.birth_date, NULL, NULL, NULL, NULL
    FROM mlog$_customer a1
   WHERE dmltype$$ = 'D'
ORDER BY sequence$$






You can use this method to transport data from an oltp system to a staging area(Datawarehouse).
Materealized view log uses triggers to feed log table.

Monday, December 14, 2009

How to generate insert statements of a table in Oracle?


This code snippet takes one parameter as table name. Then generates insert statements of current data in this table. DBMS_SQL package is used to detect column types and retrieve data from table. You may want to look at "Is it possible to return table from oracle function?" post before you examine this code snippet. This code works on Oracle databases which have version 10g or higher.

PACKAGE PKG_UTIL2 IS
    TYPE record_type IS RECORD (strSql    varchar2(8192));
    TYPE tbl_record_type IS TABLE OF record_type;
    FUNCTION f_get_tbl_insert_dml(p_table_name in varchar2
        ) RETURN tbl_record_type PIPELINED;

END; -- Package spec

PACKAGE BODY PKG_UTIL2 IS
    function f_get_tbl_insert_dml(p_table_name in varchar2
        ) return tbl_record_type pipelined as
 
        out_rec record_type;
        str_sql   varchar2(8192);
        desc_tab    dbms_sql.desc_tab;
        col_cnt     number;
        cur_id      NUMBER;
        namevar  VARCHAR2(32767);
        numvar   NUMBER;
        datevar  DATE;
        opr     varchar2(4000);
        ret_val varchar2(8192);
        tbl_col_cnt number;
     
    begin
        str_sql := 'select * from '||p_table_name;
        cur_id := dbms_sql.open_cursor;
        dbms_sql.parse( c => cur_id, statement => str_sql, language_flag => dbms_sql.native);
  
        dbms_sql.describe_columns( c => cur_id, col_cnt => col_cnt, desc_t  => desc_tab);
  
        FOR i IN 1 .. col_cnt LOOP
            IF desc_tab(i).col_type = 2 THEN
                DBMS_SQL.DEFINE_COLUMN(cur_id, i, numvar);
            ELSIF desc_tab(i).col_type = 12 THEN
                DBMS_SQL.DEFINE_COLUMN(cur_id, i, datevar);
            ELSE
                DBMS_SQL.DEFINE_COLUMN(cur_id, i, namevar, 4000);
            END IF;
        END LOOP;
  
        if DBMS_SQL.execute(cur_id) = 0 then
            while DBMS_SQL.FETCH_ROWS(cur_id) > 0 loop
                ret_val := 'INSERT INTO ' || p_table_name || ' VALUES (';
                for i in 1..col_cnt loop
                    if desc_tab(i).col_type = 2 then  --number
                        DBMS_SQL.COLUMN_VALUE(cur_id, i, numvar);
                        if numvar is null then
                            ret_val := ret_val ||  'null,';
                        else
                            ret_val := ret_val || replace(to_char(numvar),',','.') || ',';
                        end if;
                    elsif desc_tab(i).col_type = 12 then --date
                        DBMS_SQL.COLUMN_VALUE(cur_id, i, datevar);
                        if datevar is null then
                            ret_val := ret_val ||  'null,';
                        else
                            if datevar = trunc(datevar) then
                                ret_val := ret_val || 'to_date('''
                                    || to_char(datevar, 'DD/MM/YYYY') ||''',''DD/MM/YYYY''),';
                            else
                                ret_val := ret_val || 'to_date('''
                                    || to_char(datevar, 'DD/MM/YYYY hh24:mi:ss') ||''',''DD/MM/YYYY hh24:mi:ss''),';
                            end if;
                        end if;
                    else --varchar2, char, others
                        DBMS_SQL.COLUMN_VALUE(cur_id, i, namevar);
                        if namevar is null then
                            ret_val := ret_val ||  'null,';
                        else
                            ret_val := ret_val || '''' || namevar || ''',';
                        end if;
                    end if;
                end loop;
            
                ret_val := substr(ret_val,1,length(ret_val)-1)  || ')';
                out_rec.strSql := ret_val;
                pipe row (out_rec);
            end loop;
        end if;
        DBMS_SQL.CLOSE_CURSOR(cur_id);
  
        return;
    end;
END;

Usage
select * from table(PKG_UTIL2.f_get_tbl_insert_dml('customer'))

select * from customer 











select * from table(PKG_UTIL2.f_get_tbl_insert_dml('customer'))



Tuesday, December 8, 2009

Is it possible to return table from oracle function?

 
In Oracle 9i and previous versions you could return table from a function by creating a type and a function which returns that type as pipelined function. In Oracle databases which have lower versions than 10g we should create that object on database. And it was a bit hard when you want to change the return structure.
We had to drop that type and recreate it. In Oracle 10g and higher verions we can return table from a function by no creating any type on database. We could do this by declaring a record in package spec. The code below works on Oracle databases which have version is 10g or higher.

PACKAGE PKG_PIPELINE IS
    TYPE record_type IS RECORD (
        string      varchar2(8192),
        nmbr        number,
        dt         date
    );
    TYPE tbl_record_type IS TABLE OF record_type;
    function f_test(cnt in number) RETURN tbl_record_type PIPELINED;
 
END; -- Package spec



PACKAGE BODY PKG_PIPELINE IS
    function f_test(cnt in number) RETURN tbl_record_type PIPELINED as
        PRAGMA AUTONOMOUS_TRANSACTION;
        out_rec     record_type;
    begin
      
        out_rec.string := 'A';
        out_rec.nmbr := 5;
        out_rec.dt := sysdate;
        pipe row (out_rec);
     
        for i in 1..2*cnt loop
            out_rec.string := 'B';
            out_rec.nmbr := i;
            out_rec.dt := sysdate-1;
            pipe row (out_rec);
        end loop;
        return;
    end;
  END;


Usage
select * from table(PKG_PIPELINE.f_test(3))