flexible pipelined functions
Pipelined functions, type polymorphism (type hierarchies and substitution) and multi-table inserts are three features from the Oracle 9i timeframe. Pipelined functions are used primarily in ETL applications (extract, transform and load) for efficient processing of complex transformations. Type substitution is used for modelling complex data types and relationships in more object-oriented applications. Multi-table inserts (another ETL feature of Oracle) are used to load one or more tables from a single rowsource.
In this article we will combine these features by using type polymorphism to return multiple record structures from a single pipelined function. We will load the resulting dataset into multiple tables using multi-table insert.
It is assumed that readers are familiar with the technologies being described in this article. For readers who require some background reading, oracle-developer.net has the following articles:
why is this technique significant?
This technique is significant on two counts. First, it is usually the case that pipelined functions will return a single, known record structure (unless using complex ANYDATASET structures made available by Oracle's Data Cartridge). The Oracle documentation and other articles often show pipelined functions transforming a single input record into two output records, but always of the same structure. This article will show how single input records can be transformed into multiple output rows of different structures.
Second, this technique solves a very particular, but common, performance problem. It is quite common in traditional batch environments (such as data warehouses) to receive source data in flat-files. It is equally common that some of these files contain data for more than one target table in the data warehouse. Assuming that the data transformations that must take place are complex (i.e. too complex for SQL), a typical loading approach is as follows (in pseudo-code).
FOR rec IN (SELECT * FROM source_data) LOOP ...prepare table_A variables... ...prepare table_B variables... ...prepare table_C variables... INSERT INTO table_A VALUES (...); INSERT INTO table_B VALUES (...); INSERT INTO table_C VALUES (...); END LOOP;
This is a simple process to code and understand, yet it is inefficient and slow. Using the techniques we will describe in this article, we will combine the efficiency of bulk SQL, parallel pipelined functions and substitutable types to change this load to something that resembles the following pseudo-code.
INSERT FIRST -- WHEN (record is of table_A format) INTO table_A VALUES (...) -- WHEN (record is of table_B format) INTO table_B VALUES (...) -- WHEN (record is of table_C format) INTO table_C VALUES (...) -- SELECT ... FROM TABLE(parallel_pipelined_function(CURSOR(...)));
setup
We will begin by setting up our sample application. We are going use a simplified investment trading model for our demonstrations. This will include:
- TRADES: this will store information on a trade that takes place between two counterparties;
- TRADE_LEGS: child of TRADES. A single trade might comprise multiple legs (e.g. a swap trade). In our application, we are going to ensure that every trade has two legs; and
- LEG_VALUATIONS: child of TRADE_LEGS. Each leg of a trade will be valued every day that it is active (i.e. not matured).
target tables
Given this simple model, we can now create our target tables. Note that all tables are defined with PARALLEL because we will exploit this later when we come to using our pipelined function.
SQL> CREATE TABLE trades 2 ( trade_id NUMBER 3 , product_type VARCHAR2(10) 4 , counterparty VARCHAR2(30) 5 , trade_timestamp TIMESTAMP 6 , trading_book VARCHAR2(30) 7 , maturity_date DATE 8 , CONSTRAINT trades_pk 9 PRIMARY KEY (trade_id) 10 ) 11 PARALLEL;
Table created.
SQL> CREATE TABLE trade_legs 2 ( trade_id NUMBER 3 , leg_no NUMBER 4 , trade_amount NUMBER 5 , currency VARCHAR2(3) 6 , trade_price NUMBER 7 , CONSTRAINT trade_legs_pk 8 PRIMARY KEY (trade_id, leg_no) 9 , CONSTRAINT trade_legs_fk01 10 FOREIGN KEY (trade_id) 11 REFERENCES trades (trade_id) 12 ) 13 PARALLEL;
Table created.
SQL> CREATE TABLE leg_valuations 2 ( trade_id NUMBER 3 , leg_no NUMBER 4 , valuation_date DATE 5 , market_value NUMBER 6 , CONSTRAINT leg_valuations_pk 7 PRIMARY KEY (trade_id, leg_no, valuation_date) 8 , CONSTRAINT leg_valuations_fk01 9 FOREIGN KEY (trade_id, leg_no) 10 REFERENCES trade_legs (trade_id, leg_no) 11 ) 12 PARALLEL;
Table created.
source data
For our demonstration, we require some source data. This is typically provided in flat-files and loaded into a staging table (or made available directly via an external table). For simplicity, we will create a staging table and populate it with data as follows. Again, this table is defined with PARALLEL.
SQL> CREATE TABLE trades_staging 2 ( trade_id NUMBER 3 , leg_no NUMBER 4 , valuation_date DATE 5 , product_type VARCHAR2(10) 6 , counterparty VARCHAR2(30) 7 , trade_timestamp TIMESTAMP 8 , trading_book VARCHAR2(30) 9 , maturity_date DATE 10 , trade_amount NUMBER 11 , currency VARCHAR2(3) 12 , trade_price NUMBER 13 , market_value NUMBER 14 ) 15 PARALLEL;
Table created.
SQL> INSERT INTO trades_staging 2 SELECT object_id AS trade_id 3 , ROW_NUMBER() OVER 4 (PARTITION BY object_id ORDER BY 1) AS leg_no 5 , TRUNC(SYSDATE)-1 AS valuation_date 6 , SUBSTR(object_type,1,10) AS product_type 7 , owner AS counterparty 8 , TO_TIMESTAMP(timestamp,'YYYY-MM-DD:HH24:MI:SS') AS trade_timestamp 9 , object_type AS trading_book 10 , created + MOD(object_id,500) AS maturity_date 11 , ABS(DBMS_RANDOM.RANDOM) AS trade_amount 12 , 'GBP' AS currency 13 , DBMS_RANDOM.VALUE AS trade_price 14 , ABS(DBMS_RANDOM.RANDOM) AS market_value 15 FROM all_objects 16 , (SELECT NULL FROM all_objects WHERE ROWNUM <= 2) 17 WHERE object_id IS NOT NULL;
99716 rows created.
The source data is manufactured in such a way that there are two legs and valuations for every trade. Because of the denormalised nature of the source data, we will have duplicated TRADES data denormalised on each record-pairing. As with the scenario described in the introduction, we will need to transform and load this source data into each of our three trading tables.
type hierarchy
Pipelined functions require two types: one to define a output record structure and one to buffer a collection of this structure. Usually, we create a single object type and corresponding collection type for this purpose. As described earlier, however, we are going to use type polymorphism to enable us to pass around record types of different structures under the guise of a single object type. To do this, we need to create a type hierarchy. Our hierarchy will comprise the following:
- a single supertype, containing a single attribute that each subtype will inherit. This will be a "generic" type, used to define parameters and variables in our PL/SQL APIs; and
- three subtypes, each matching one of the structures of our sample trading tables (minus the supertype attribute). Because these are defined as subtypes, they can be used in place of the supertype (i.e. substitute for the supertype).
We will begin by creating our "generic" supertype as follows. This will be the object type that the pipelined function will stream from the PIPE ROW statements.
SQL> CREATE TYPE transaction_ot AS OBJECT 2 ( transaction_id NUMBER 3 ) 4 NOT FINAL; 5 /
Type created.
Note that we have declared this as NOT FINAL to indicate that we are creating a hierarchy that will be implemented through subtypes. We have included a single attribute of TRANSACTION_ID to represent a generic primary key attribute that may or may not be extended in the subtypes (in our trading model, we have a TRADE_ID as the primary transaction key). We could also define this as NOT INSTANTIABLE if we wished to ensure that no direct instances of the supertype were coded into our programs.
This object type defines the structure of a single record returned by our pipelined function. As with all pipelined function implementations, we must create a corresponding collection type, as follows.
SQL> CREATE OR REPLACE TYPE transaction_ntt 2 AS TABLE OF transaction_ot; 3 /
Type created.
In "regular" pipelined function implementations, we would be ready to code the function at this point. However, such a function would only pipe out arrays of a single record structure containing a single attribute (TRANSACTION_ID). We want to be able to pipe back multiple record structures from the function, hence we need a type hierarchy. To complete this hierarchy, we will create three further object types: one for each of our target trading tables described earlier. We will begin by creating a subtype for TRADES records as follows. Note that the TRADE_ID column will be accounted for by the supertype's generic TRANSACTION_ID attribute.
SQL> CREATE TYPE trade_ot UNDER transaction_ot 2 ( product_type VARCHAR2(10) 3 , counterparty VARCHAR2(30) 4 , trade_timestamp TIMESTAMP 5 , trading_book VARCHAR2(30) 6 , maturity_date DATE 7 , CONSTRUCTOR FUNCTION trade_ot 8 RETURN SELF AS RESULT 9 ) 10 FINAL; 11 /
Type created.
Note that we have defined this as a subtype of our generic supertype using the UNDER syntax. Note also that we have declared a non-default constructor function. This is purely for convenience later, when we will be able to initialise an instance of this type without having to pass a value for each attribute in the type. We therefore need a type body, as follows.
SQL> CREATE TYPE BODY trade_ot AS 2 CONSTRUCTOR FUNCTION trade_ot 3 RETURN SELF AS RESULT IS 4 BEGIN 5 RETURN; 6 END; 7 END; 8 /
Type body created.
We will now continue the pattern and complete our type hierarchy by creating the remaining types.
SQL> CREATE TYPE trade_leg_ot UNDER transaction_ot 2 ( leg_no NUMBER 3 , trade_amount NUMBER 4 , currency VARCHAR2(3) 5 , trade_price NUMBER 6 , CONSTRUCTOR FUNCTION trade_leg_ot 7 RETURN SELF AS RESULT 8 ) 9 FINAL; 10 /
Type created.
SQL> CREATE TYPE BODY trade_leg_ot AS 2 CONSTRUCTOR FUNCTION trade_leg_ot 3 RETURN SELF AS RESULT IS 4 BEGIN 5 RETURN; 6 END; 7 END; 8 /
Type body created.
SQL> CREATE TYPE leg_valuation_ot UNDER transaction_ot 2 ( leg_no NUMBER 3 , valuation_date DATE 4 , market_value NUMBER 5 , CONSTRUCTOR FUNCTION leg_valuation_ot 6 RETURN SELF AS RESULT 7 ) 8 FINAL; 9 /
Type created.
SQL> CREATE TYPE BODY leg_valuation_ot AS 2 CONSTRUCTOR FUNCTION leg_valuation_ot 3 RETURN SELF AS RESULT IS 4 BEGIN 5 RETURN; 6 END; 7 END; 8 /
Type body created.
creating the pipelined function
To recap, therefore, we have a single set of source data and three target tables to be loaded from this source data. We have described this data via a type hierarchy that we will now use in our pipelined function implementation. We will begin with a package specification, as follows.
SQL> CREATE PACKAGE trades_load AS 2 3 FUNCTION trades_transform( p_source_data IN SYS_REFCURSOR ) 4 RETURN transaction_ntt 5 PIPELINED 6 PARALLEL_ENABLE (PARTITION p_source_data BY ANY); 7 8 PROCEDURE load_trades; 9 10 END trades_load; 11 /
Package created.
Note that our package has two public programs: our pipelined function (defined as parallel-enabled) and a procedure to invoke the load itself. Note how our pipelined function returns the generic collection type as described earlier. That is, it will return multiple instances of the TRANSACTION_OT type or any other type that is allowed to substitute for it (i.e. any of our three subtypes). This is the critical point and it highlights the flexibility we achieve by using type polymorphism in this way.
We will now implement our pipelined function by creating the package body. Note that the loading procedure is stubbed at this stage to minimise the code listing.
SQL> CREATE PACKAGE BODY trades_load AS 2 3 ------------------------------------------------------------ 4 5 FUNCTION trades_transform( p_source_data IN SYS_REFCURSOR ) 6 RETURN transaction_ntt 7 PIPELINED 8 PARALLEL_ENABLE (PARTITION p_source_data BY ANY) IS 9 10 /* Array of input record type... */ 11 TYPE aat_source_data IS TABLE OF trades_staging%ROWTYPE 12 INDEX BY PLS_INTEGER; 13 aa_source_data aat_source_data; 14 r_source_data trades_staging%ROWTYPE; 15 16 /* Output record types... */ 17 r_trade trade_ot := trade_ot(); 18 r_trade_leg trade_leg_ot := trade_leg_ot(); 19 r_leg_valuation leg_valuation_ot := leg_valuation_ot(); 20 21 BEGIN 22 23 LOOP 24 25 FETCH p_source_data BULK COLLECT INTO aa_source_data LIMIT 100; 26 27 FOR i IN 1 .. aa_source_data.COUNT LOOP 28 29 /* Work with a single record... */ 30 r_source_data := aa_source_data(i); 31 32 /* Reset the variables... */ 33 r_trade := trade_ot(); 34 r_trade_leg := trade_leg_ot(); 35 r_leg_valuation := leg_valuation_ot(); 36 37 /* Prepare and pipe the trade record... */ 38 IF r_source_data.leg_no = 1 THEN 39 40 r_trade.transaction_id := r_source_data.trade_id; 41 r_trade.product_type := r_source_data.product_type; 42 r_trade.counterparty := r_source_data.counterparty; 43 r_trade.trading_book := r_source_data.trading_book; 44 r_trade.trade_timestamp := r_source_data.trade_timestamp; 45 r_trade.maturity_date := r_source_data.maturity_date; 46 47 PIPE ROW (r_trade); 48 49 END IF; 50 51 /* Prepare and pipe the trade_leg record... */ 52 r_trade_leg.transaction_id := r_source_data.trade_id; 53 r_trade_leg.leg_no := r_source_data.leg_no; 54 r_trade_leg.trade_amount := r_source_data.trade_amount; 55 r_trade_leg.currency := r_source_data.currency; 56 r_trade_leg.trade_price := r_source_data.trade_price; 57 58 PIPE ROW (r_trade_leg); 59 60 /* Prepare and pipe the leg_valuation record... */ 61 r_leg_valuation.transaction_id := r_source_data.trade_id; 62 r_leg_valuation.leg_no := r_source_data.leg_no; 63 r_leg_valuation.valuation_date := r_source_data.valuation_date; 64 r_leg_valuation.market_value := r_source_data.market_value; 65 66 PIPE ROW (r_leg_valuation); 67 68 END LOOP; 69 70 EXIT WHEN p_source_data%NOTFOUND; 71 72 END LOOP; 73 CLOSE p_source_data; 74 75 RETURN; 76 77 END trades_transform; 78 79 ------------------------------------------------------------ 80 81 PROCEDURE load_trades IS 82 BEGIN 83 NULL; 84 END load_trades; 85 86 END trades_load; 87 /
Package body created.
We now have a pipelined function that returns three different record structures as substitutes for the supertype. Note in particular the following:
- Lines 17-19: we have three different record variables to be populated and returned from each staging row;
- Lines 33-35: prior to processing each source row, we reset each record by invoking the non-default type constructors we created earlier;
- Lines 38-50: we only want to pipe one trade record per trade leg pairing, hence the LEG_NO test. We assign the TRADES "record" and pipe it;
- Lines 52-58: we prepare and pipe a TRADE_LEGS record for each source record;
- Lines 61-66: we prepare and pipe a LEG_VALUATIONS record for each source record.
For performance reasons, we have defined the pipelined function as parallel-enabled and are using bulk fetches from the cursor in small array sizes. A critical point to note is that for clarity this example deliberately excludes any of the complex transformations that would necessitate a PL/SQL solution. Given the format above, it would of course be much more simple and efficient to use bulk SQL loading. The need for a PL/SQL approach is therefore assumed as a necessity.
loading from the pipelined function
We will now use our pipelined function. We will begin by demonstrating how we query the function, starting with a simple SQL statement as follows.
SQL> SELECT * 2 FROM TABLE( 3 trades_load.trades_transform( 4 CURSOR( SELECT * FROM trades_staging ) ) ) 5 WHERE ROWNUM <= 5;
TRANSACTION_ID -------------- 14636 14636 14637 14637 14637 5 rows selected.
First, note the syntax. This should be familiar to users of pipelined functions and readers who followed the links to the background articles earlier. We are passing in a ref cursor parameter to the pipelined function because this is a pre-requisite to enable parallel execution. Second, note how we use "SELECT *" but only receive a single column back. Remember that the pipelined function is based on our TRANSACTION_OT type (and TRANSACTION_NTT collection type). This type only contains a single attribute, so what we see above is semantically correct, even though we have piped rows of a different structure.
The reason (and solution) is simple. When using type substitution, Oracle does not downcast a supertype into its correct subtype unless we tell it to. We do this in two stages. First, we must retrieve the actual object instances from the function and not the individual attributes. In the first example, we tried to access the attributes using "SELECT *". We retrieve the actual object instances by using the VALUE function, as follows.
SQL> SELECT VALUE(nt) 2 FROM TABLE( 3 trades_load.trades_transform( 4 CURSOR( SELECT * FROM trades_staging ) ) ) nt 5 WHERE ROWNUM <= 5;
VALUE(NT)(TRANSACTION_ID) ----------------------------------------------------------------------------------------- TRADE_LEG_OT(14636, 2, 386190879, 'GBP', .724850851) LEG_VALUATION_OT(14636, 2, '12-AUG-07', 2096427733) TRADE_OT(14637, 'SYNONYM', 'PUBLIC', '30-AUG-05 14.16.38.000000', 'SYNONYM', '14-JAN-06') TRADE_LEG_OT(14637, 1, 292552620, 'GBP', .555342993) LEG_VALUATION_OT(14637, 1, '12-AUG-07', 670904643) 5 rows selected.
We can now see for the first time that we have a truly flexible pipelined function! In the output above, we have three different structures being returned from the function. We can take this a stage further and decode the type of each object instance using IS OF conditions, as follows.
SQL> SELECT VALUE(nt) AS record_value 2 , CASE 3 WHEN VALUE(nt) IS OF TYPE (trade_ot) 4 THEN 'TRADES' 5 WHEN VALUE(nt) IS OF TYPE (trade_leg_ot) 6 THEN 'TRADE_LEGS' 7 ELSE 'LEG_VALUATIONS' 8 END AS record_type 9 FROM TABLE( 10 trades_load.trades_transform( 11 CURSOR( SELECT * FROM trades_staging ) ) ) nt 12 WHERE ROWNUM <= 5;
RECORD_VALUE(TRANSACTION_ID) RECORD_TYPE ---------------------------------------------------------------------- -------------------- TRADE_LEG_OT(14636, 2, 386190879, 'GBP', .724850851) TRADE_LEGS LEG_VALUATION_OT(14636, 2, '12-AUG-07', 2096427733) LEG_VALUATIONS TRADE_OT(14637, 'SYNONYM', 'PUBLIC', '30-AUG-05 14.16.38.000000', TRADES 'SYNONYM', '14-JAN-06') TRADE_LEG_OT(14637, 1, 292552620, 'GBP', .555342993) TRADE_LEGS LEG_VALUATION_OT(14637, 1, '12-AUG-07', 670904643) LEG_VALUATIONS 5 rows selected.
We have now "labelled" each object instance with the table it is eventually going to be loaded into. When we build our multi-table insert statement later, it will be obvious why we have included this CASE expression. To complete the downcasting of the object instances to their correct subtypes, we require a final step. Using the TREAT function, we can attempt to cast each object instance to each of the subtypes, as follows.
SQL> SELECT CASE 2 WHEN VALUE(nt) IS OF TYPE (trade_ot) 3 THEN 'TRADES' 4 WHEN VALUE(nt) IS OF TYPE (trade_leg_ot) 5 THEN 'TRADE_LEGS' 6 ELSE 'LEG_VALUATIONS' 7 END AS record_type 8 , TREAT(VALUE(nt) AS trade_ot) AS trade_rec 9 , TREAT(VALUE(nt) AS trade_leg_ot) AS trade_leg_rec 10 , TREAT(VALUE(nt) AS leg_valuation_ot) AS leg_valuation_rec 11 FROM TABLE( 12 trades_load.trades_transform( 13 CURSOR( SELECT * FROM trades_staging ) ) ) nt 14 WHERE ROWNUM <= 5;
RECORD_TYPE TRADE_REC TRADE_LEG_REC LEG_VALUATION_REC -------------- ------------------------------ ------------------------------ -------------------------- TRADE_LEGS TRADE_LEG_OT(14636, 2, 3861908 79, 'GBP', .724850851) LEG_VALUATIONS LEG_VALUATION_OT(14636, 2, '12-AUG-07', 2096427733) TRADES TRADE_OT(14637, 'SYNONYM', 'PU BLIC', '30-AUG-05 14.16.38.000 000', 'SYNONYM', '14-JAN-06') TRADE_LEGS TRADE_LEG_OT(14637, 1, 2925526 20, 'GBP', .555342993) LEG_VALUATIONS LEG_VALUATION_OT(14637, 1, '12-AUG-07', 670904643) 5 rows selected.
Of course, each record returning from the pipelined function is of one subtype only. On each record, therefore, two of the TREAT functions will return NULL and only one will yield the correct subtype. At this stage, however, we have successfully returned multiple record types from a single pipelined function and are now ready to access their respective attributes. We do this as follows.
SQL> SELECT ilv.record_type 2 , ilv.trade_rec.transaction_id AS trade_id 3 , ilv.trade_rec.product_type AS product_type 4 , ilv.trade_leg_rec.leg_no AS leg_no 5 , ilv.leg_valuation_rec.valuation_date AS valuation_date 6 FROM ( 7 SELECT CASE 8 WHEN VALUE(nt) IS OF TYPE (trade_ot) 9 THEN 'TRADES' 10 WHEN VALUE(nt) IS OF TYPE (trade_leg_ot) 11 THEN 'TRADE_LEGS' 12 ELSE 'LEG_VALUATIONS' 13 END AS record_type 14 , TREAT(VALUE(nt) AS trade_ot) AS trade_rec 15 , TREAT(VALUE(nt) AS trade_leg_ot) AS trade_leg_rec 16 , TREAT(VALUE(nt) AS leg_valuation_ot) AS leg_valuation_rec 17 FROM TABLE( 18 trades_load.trades_transform( 19 CURSOR( SELECT * FROM trades_staging ) ) ) nt 20 WHERE ROWNUM <= 5 21 ) ilv;
RECORD_TYPE TRADE_ID PRODUCT_TY LEG_NO VALUATION -------------------- ---------- ---------- ---------- --------- TRADE_LEGS 2 LEG_VALUATIONS 12-AUG-07 TRADES 14637 SYNONYM TRADE_LEGS 1 LEG_VALUATIONS 12-AUG-07 5 rows selected.
This shows a small sample of the available attributes, but we now have everything we need for our multi-table load. A restriction of multi-table insert is that we cannot carry object instances up to the VALUES clauses on the inserts. Hence we must decompose the objects in the SELECT section, as we see above. Given this, we will now add the LOAD_TRADES procedure to our package body. Note that the pipelined function code is omitted for brevity.
SQL> CREATE OR REPLACE PACKAGE BODY trades_load AS 2 3 ------------------------------------------------------------ 4 5 FUNCTION trades_transform( ...snip... 78 79 ------------------------------------------------------------ 80 81 PROCEDURE load_trades IS 82 BEGIN 83 84 INSERT FIRST 85 WHEN record_type = 'TRADES' 86 THEN 87 INTO trades ( trade_id 88 , product_type 89 , counterparty 90 , trade_timestamp 91 , trading_book 92 , maturity_date 93 ) 94 VALUES ( trade_id 95 , product_type 96 , counterparty 97 , trade_timestamp 98 , trading_book 99 , maturity_date 100 ) 101 WHEN record_type = 'TRADE_LEGS' 102 THEN 103 INTO trade_legs ( trade_id 104 , leg_no 105 , trade_amount 106 , currency 107 , trade_price 108 ) 109 VALUES ( trade_id 110 , leg_no 111 , trade_amount 112 , currency 113 , trade_price 114 ) 115 WHEN record_type = 'LEG_VALUATIONS' 116 THEN 117 INTO leg_valuations ( trade_id 118 , leg_no 119 , valuation_date 120 , market_value 121 ) 122 VALUES ( trade_id 123 , leg_no 124 , valuation_date 125 , market_value 126 ) 127 SELECT ilv.record_type 128 , COALESCE( 129 ilv.trade_rec.transaction_id, 130 ilv.trade_leg_rec.transaction_id, 131 ilv.leg_valuation_rec.transaction_id 132 ) AS trade_id 133 , COALESCE( 134 ilv.trade_leg_rec.leg_no, 135 ilv.leg_valuation_rec.leg_no 136 ) AS leg_no 137 , ilv.trade_rec.product_type AS product_type 138 , ilv.trade_rec.counterparty AS counterparty 139 , ilv.trade_rec.trade_timestamp AS trade_timestamp 140 , ilv.trade_rec.trading_book AS trading_book 141 , ilv.trade_rec.maturity_date AS maturity_date 142 , ilv.trade_leg_rec.trade_amount AS trade_amount 143 , ilv.trade_leg_rec.currency AS currency 144 , ilv.trade_leg_rec.trade_price AS trade_price 145 , ilv.leg_valuation_rec.valuation_date AS valuation_date 146 , ilv.leg_valuation_rec.market_value AS market_value 147 FROM ( 148 SELECT CASE 149 WHEN VALUE(nt) IS OF TYPE (trade_ot) 150 THEN 'TRADES' 151 WHEN VALUE(nt) IS OF TYPE (trade_leg_ot) 152 THEN 'TRADE_LEGS' 153 ELSE 'LEG_VALUATIONS' 154 END AS record_type 155 , TREAT(VALUE(nt) AS trade_ot) AS trade_rec 156 , TREAT(VALUE(nt) AS trade_leg_ot) AS trade_leg_rec 157 , TREAT(VALUE(nt) AS leg_valuation_ot) AS leg_valuation_rec 158 FROM TABLE( 159 trades_load.trades_transform( 160 CURSOR( SELECT * FROM trades_staging ) ) ) nt 161 ) ilv; 162 163 DBMS_OUTPUT.PUT_LINE( SQL%ROWCOUNT || ' rows inserted.' ); 164 165 END load_trades; 166 167 END trades_load; 168 /
Package body created.
Our load is quite simple. We select the attributes from each record as described in earlier examples and label each row with its target table using a CASE expression. This "label" is used in the WHEN clauses of the INSERT FIRST section to determine which table the record represents and we insert accordingly. We now have a multi-table load using a pipelined function and type substitution.
testing the loads
We are now ready to test our loads. Remember that we enabled parallel at both table and function level. In multi-table inserts, the entire DML section is parallel-enabled if just one of the target tables is set to parallel. We will therefore enable parallel query and DML in our session as follows.
SQL> ALTER SESSION ENABLE PARALLEL QUERY;
Session altered.
SQL> ALTER SESSION ENABLE PARALLEL DML;
Session altered.
We will now execute our load, as follows.
SQL> exec trades_load.load_trades;
BEGIN trades_load.load_trades; END; * ERROR at line 1: ORA-02291: integrity constraint (SCOTT.TRADE_LEGS_FK01) violated - parent key not found ORA-06512: at "SCOTT.TRADES_LOAD", line 72 ORA-06512: at line 1
This is disappointing! Oracle has tried to load a TRADE_LEGS record before its parent TRADES record is present. The reason for this is simple: multi-table inserts do not guarantee order of inserts and we are seeing evidence of this. We might think that we can simply force the ordering of the input data, but this does not change the fact that Oracle has decided to load TRADE_LEGS before TRADES. In the sample code file (see bottom of article for download details) another version of the TRADES_LOAD package is included. This is the same as the above with the following additions, all added in the vain attempt to ensure that the INSERT FIRST section receives data in the order in which we need it to be loaded.
- an ORDER BY on the select from the pipelined function, to ensure that records are ordered in TRADES->TRADE_LEGS->LEG_VALUATIONS order; plus
- an ORDER BY on the CURSOR expression over the TRADES_STAGING table to ensure records are input into the pipelined function in TRADE_ID and LEG_NO order; plus
- a streaming clause on the pipelined function itself, which enables us to control the order in which data is piped to the consumer.
Despite these three combined attempts at "brute force" ordering, we receive the same error message as above. The issue lies with the multi-table insert. We must therefore workaround this problem in one of two ways:
- temporarily disable the foreign key constraints; or
- use deferrable foreign key constraints.
We will test both of these workarounds below.
disable/enable constraints
To ensure we can load all of our tables, we will disable the foreign key constraints, as follows.
SQL> ALTER TABLE trade_legs DISABLE CONSTRAINT trade_legs_fk01;
Table altered.
SQL> ALTER TABLE leg_valuations DISABLE CONSTRAINT leg_valuations_fk01;
Table altered.
We will now test our loading procedure again.
SQL> exec trades_load.load_trades;
249290 rows inserted. PL/SQL procedure successfully completed.
SQL> COMMIT;
Commit complete.
We have successfully loaded our tables. As a quick sanity-check, we will count the records we loaded, as follows.
SQL> SELECT COUNT(*) FROM trades;
COUNT(*) ---------- 49858 1 row selected.
SQL> SELECT COUNT(*) FROM trade_legs;
COUNT(*) ---------- 99716 1 row selected.
SQL> SELECT COUNT(*) FROM leg_valuations;
COUNT(*) ---------- 99716 1 row selected.
We will now enable our foreign key constraints, which will raise an exception if there are any data issues.
SQL> ALTER TABLE trade_legs ENABLE CONSTRAINT trade_legs_fk01;
Table altered.
SQL> ALTER TABLE leg_valuations ENABLE CONSTRAINT leg_valuations_fk01;
Table altered.
Success! Note that this method is possible for serialised systems, but for concurrent applications, it might not be suitable. As an alternative, we can use deferrable constraints, as demonstrated below.
deferrable constraints
Deferred constraints enable us to postpone the checking of constraint violations until the end of a transaction (i.e. a COMMIT). This enables us to load data in a state that temporarily violates one or more constraints, but which is rectified at a later stage. The deferrable property of constraints can only be set at the time of creation. We will therefore drop and re-create our two foreign keys as follows.
SQL> ALTER TABLE trade_legs DROP CONSTRAINT trade_legs_fk01;
Table altered.
SQL> ALTER TABLE leg_valuations DROP CONSTRAINT leg_valuations_fk01;
Table altered.
SQL> ALTER TABLE trade_legs ADD 2 CONSTRAINT trade_legs_fk01 3 FOREIGN KEY (trade_id) 4 REFERENCES trades (trade_id) 5 DEFERRABLE;
Table altered.
SQL> ALTER TABLE leg_valuations ADD 2 CONSTRAINT leg_valuations_fk01 3 FOREIGN KEY (trade_id, leg_no) 4 REFERENCES trade_legs (trade_id, leg_no) 5 DEFERRABLE;
Table altered.
Our foreign keys are now deferrable. To exploit this, we must set their states to deferred, as follows.
SQL> SET CONSTRAINT trade_legs_fk01 DEFERRED;
Constraint set.
SQL> SET CONSTRAINT leg_valuations_fk01 DEFERRED;
Constraint set.
Having cleared the sample tables of the data loaded in the previous example, we will re-run our loading procedure.
SQL> exec trades_load.load_trades;
249305 rows inserted. PL/SQL procedure successfully completed.
Again, we manage to load the three tables successfully but we haven't validated the data yet. Deferred constraints are not checked until the end of a transaction, so we will now COMMIT our load.
SQL> COMMIT;
Commit complete.
Success! We will now check a small sample of the data we loaded.
SQL> SELECT * FROM trades WHERE ROWNUM <= 5;
TRADE_ID PRODUCT_TYPE COUNTERPARTY TRADE_TIMESTAMP MATURITY_DATE ---------- ------------ ------------- -------------------------- ------------- 6774 TYPE SYS 30-AUG-05 14.02.47.000000 31-MAY-06 6778 VIEW SYS 30-AUG-05 14.02.47.000000 04-JUN-06 6782 VIEW SYS 30-AUG-05 14.02.48.000000 08-JUN-06 6786 VIEW SYS 30-AUG-05 14.02.48.000000 12-JUN-06 6790 TYPE SYS 30-AUG-05 14.02.49.000000 16-JUN-06 5 rows selected.
SQL> SELECT * FROM trade_legs WHERE ROWNUM <= 5;
TRADE_ID LEG_NO TRADE_AMOUNT CURRENCY TRADE_PRICE ---------- ---------- ------------ -------- ----------- 6774 1 1624901166 GBP .347733816 6776 2 524881873 GBP .904404062 6778 1 622715309 GBP .608247575 6780 2 821449852 GBP .508567497 6782 1 1623359117 GBP .674977682 5 rows selected.
SQL> SELECT * FROM leg_valuations WHERE ROWNUM <= 5;
TRADE_ID LEG_NO VALUATION_DATE MARKET_VALUE ---------- ---------- -------------- ------------ 2545 2 14-AUG-07 1042583556 2546 1 14-AUG-07 1098505446 2547 1 14-AUG-07 49145215 2548 2 14-AUG-07 97502618 2549 2 14-AUG-07 127231786 5 rows selected.
The primary advantage of this method over the disable/enable method is that concurrency is not affected by the constraint states of a single session. We can therefore adopt this method in a multi-user application that is likely to load the same target tables concurrently.
an alternative approach to consider
In this article, we have demonstrated a powerful new technique, combining three unique features (multi-table insert, parallel pipelined functions and type substitution) to extend the ETL potential of Oracle. As a final note, however, there is a simple alternative that some readers might wish to explore. Remember that pipelined functions are usually defined by a single object type and collection of this type. We can easily remove the type hierarchy from our example but still load multiple tables from a single function. We would instead develop the following:
- a single object type that contains all of the attributes needed for all target tables. In our example, we would have a single denormalised type containing all of the attributes in TRADES, TRADE_LEGS and LEG_VALUATIONS;
- a collection of this single object type as usual;
- a pipelined function to prepare and pipe each output record with all denormalised attributes populated; and
- a multi-table insert that loads the attributes into their respective tables.
With reference to our trading examples, the alternative multi-table insert would be based on the following pseudo-code.
INSERT ALL WHEN leg_no = 1 THEN INTO trades VALUES (...) WHEN 1=1 THEN INTO trade_legs VALUES (...) WHEN 1=1 THEN INTO leg_valuations VALUES (...) SELECT trade_id , leg_no , ...all other attributes of TRADES, TRADE_LEGS and LEG_VALUATIONS... FROM TABLE( trades_load.trades_transform( CURSOR( SELECT * FROM trades_staging ) ) );
Completing this can be an exercise for the reader. The advantages of this method are that fewer records are piped from the function (i.e. several tables are loaded from each record) and the SELECT is much simpler (no type semantics are required). The disadvantage is that the records can become very wide and at the time of writing, pipelined functions perform badly with wide record structures (over 50 attributes).
further reading
For further reading on the features described in this article, follow the links at the end of each of the articles referenced in the introduction to this article.
source code
The source code for the examples in this article can be downloaded from here.
Adrian Billington, August 2007
Back to Top