array-based advanced queuing in 10g

Advanced Queuing (AQ) is Oracle's native queuing technology. AQ has been available for many versions of Oracle and more performance features have gradually been added over several releases. This short article introduces the following new functions added to the DBMS_AQ package in 10g.

As their names suggest, these APIs enable us to enqueue or dequeue batches of messages, rather than a single message at a time. As enqueues are INSERTs and dequeues are DELETEs (AQ uses tables for queuing), we can use these APIs to achieve some of the performance gains associated with bulk processing.

This article assumes that readers are familiar with AQ concepts. For an overview, read this oracle-developer.net article. This provides an introduction to queuing concepts, together with various setup requirements such as database objects, privileges etc.

setup

AQ users will be familiar with the concept of defining object types as payload structures. For array-based enqueue/dequeue, we will also require a collection type to enqueue and dequeue multiple payloads. In this article, we will create and use the following simple payload types.

SQL> CREATE TYPE demo_aq_array_ot AS OBJECT
  2  ( message VARCHAR2(4000) );
  3  /

Type created.

SQL> CREATE TYPE demo_aq_array_ntt
  2     AS TABLE OF demo_aq_array_ot;
  3  /

Type created.

Once we have defined our payload structure, we can setup and start our queue as follows.

SQL> BEGIN
  2     DBMS_AQADM.CREATE_QUEUE_TABLE (
  3        queue_table        => 'demo_aq_array_qt',
  4        queue_payload_type => 'demo_aq_array_ot'
  5        );
  6     DBMS_AQADM.CREATE_QUEUE (
  7        queue_name  => 'demo_aq_array_q',
  8        queue_table => 'demo_aq_array_qt'
  9        );
 10     DBMS_AQADM.START_QUEUE (
 11        queue_name => 'demo_aq_array_q'
 12        );
 13  END;
 14  /

PL/SQL procedure successfully completed.

dbms_aq.enqueue_array

We'll begin by using the new DBMS_AQ.ENQUEUE_ARRAY function to enqueue 2,500 messages as a single transaction. Oracle developers will be aware that work done within a single bulk transaction will be more efficient than a series of atomic operations. This is particularly true of INSERTs, which is essentially what enqueues are in AQ.

The parameters to ENQUEUE_ARRAY are very similar to those of the existing ENQUEUE procedure, except that we pass an array of messages and corresponding message properties. Our bespoke collection type defines the message array. We can set the properties for each message in the array individually using a new MESSAGE_PROPERTIES_ARRAY_T type. A single record of enqueue options is applied to all messages.

Given this, we can now see an example of enqueuing an array of 2,500 messages. We will time this by the wall-clock and compare it to 2,500 individual enqueues. Note that the ENQUEUE_ARRAY function returns the number of messages that were enqueued together with a collection of message IDs as an OUT parameter.

SQL> set timing on
 
SQL> DECLARE
  2  
  3     r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  4     r_msg_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
  5     nt_msg_properties DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T;
  6     nt_payloads       demo_aq_array_ntt;
  7     nt_msg_ids        DBMS_AQ.MSGID_ARRAY_T;
  8     v_enqueued_cnt    PLS_INTEGER;
  9  
 10  BEGIN
 11  
 12     /* Initialise the collections... */
 13     nt_payloads := demo_aq_array_ntt();
 14     nt_msg_properties := DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T();
 15     nt_msg_ids := DBMS_AQ.MSGID_ARRAY_T();
 16  
 17     /* Load up 2500 arbitrary messages... */
 18     FOR i IN 1 .. 2500 LOOP
 19  
 20        nt_payloads.EXTEND;
 21        nt_payloads(nt_payloads.LAST) := demo_aq_array_ot(
 22                                            'Message number ' || TO_CHAR(i)
 23                                            );
 24  
 25        nt_msg_properties.EXTEND;
 26        nt_msg_properties(nt_msg_properties.LAST) := r_msg_properties;
 27  
 28     END LOOP;
 29  
 30     /* Enqueue the messages in a single transaction... */
 31     v_enqueued_cnt := DBMS_AQ.ENQUEUE_ARRAY(
 32                          queue_name               => 'demo_aq_array_q',
 33                          enqueue_options          => r_enqueue_options,
 34                          array_size               => nt_payloads.COUNT,
 35                          message_properties_array => nt_msg_properties,
 36                          payload_array            => nt_payloads,
 37                          msgid_array              => nt_msg_ids
 38                          );
 39     COMMIT;
 40  
 41     DBMS_OUTPUT.PUT_LINE(
 42        'Enqueued [' || TO_CHAR(v_enqueued_cnt) || '] messages.'
 43        );
 44  
 45     DBMS_OUTPUT.PUT_LINE(
 46        'Received [' || TO_CHAR(nt_msg_ids.COUNT) || '] message IDs.'
 47        );
 48  
 49  END;
 50  /
Enqueued [2500] messages.
Received [2500] message IDs.

PL/SQL procedure successfully completed.

Elapsed: 00:00:00.70

Before we compare this with 2,500 single message enqueues, we can confirm that our message array was enqueued as follows.

SQL> SELECT COUNT(*) FROM aq$demo_aq_array_qt;

  COUNT(*)
----------
      2500

1 row selected.

We can now run two comparisons. First we will enqueue 2,500 single messages but commit once after all messages have been enqueued. This emulates the transaction behaviour of the array method.

SQL> set timing on
 
SQL> DECLARE
  2  
  3     r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
  4     r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  5     v_message_handle     RAW(16);
  6  
  7  BEGIN
  8  
  9     FOR i IN 1 .. 2500 LOOP
 10  
 11        DBMS_AQ.ENQUEUE(
 12           queue_name         => 'demo_aq_array_q',
 13           enqueue_options    => r_enqueue_options,
 14           message_properties => r_message_properties,
 15           payload            => demo_aq_array_ot('Message number ' || TO_CHAR(i)),
 16           msgid              => v_message_handle
 17           );
 18  
 19     END LOOP;
 20     COMMIT;
 21  
 22  END;
 23  /

PL/SQL procedure successfully completed.

Elapsed: 00:00:01.21

By the wall-clock alone, this takes approximately 70% longer than the new array enqueue (note also that the array example timing includes the population of the initial message collection). Of course, we are dealing with very short elapsed times in these examples, but higher-volume tests produce similar comparative results. For a second test, we can enqueue 2,500 single messages but commit each message as it is queued. This emulates the standard queuing behaviour of most AQ applications.

SQL> set timing on
 
SQL> DECLARE
  2  
  3     r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
  4     r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  5     v_message_handle     RAW(16);
  6  
  7  BEGIN
  8  
  9     FOR i IN 1 .. 2500 LOOP
 10  
 11        DBMS_AQ.ENQUEUE(
 12           queue_name         => 'demo_aq_array_q',
 13           enqueue_options    => r_enqueue_options,
 14           message_properties => r_message_properties,
 15           payload            => demo_aq_array_ot('Message number ' || TO_CHAR(i)),
 16           msgid              => v_message_handle
 17           );
 18  
 19        COMMIT;
 20  
 21     END LOOP;
 22  
 23  END;
 24  /

PL/SQL procedure successfully completed.

Elapsed: 00:00:01.73

As is probably to be expected, this is considerably "slower" (over twice the elapsed time of the array enqueue example). Commits can be expensive in highly transactional systems and 2,500 singleton inserts generate much more redo than the same number of rows being inserted in bulk. We can see the effect of the additional resources in these timings. As an exercise for the reader, compare the resource usage of the three methods using Tom Kyte's RUNSTATS utility (a version of which can be found on the Utilities page of this site).

a note on options

Before we continue, it is worth noting that there is an enormous range of options with the new array-based functions, in addition to the existing enqueue-dequeue options, consumer options and notifications/listeners etc. For example, we can enqueue messages to a named transaction group which can be dequeued in isolation to increase concurrency. Alternatively, we can dequeue across groups. We can specify a boolean search condition to dictate whether to dequeue a message and also control the part of the queue that Oracle starts dequeuing from.

This introductory article uses default values for all examples, rather than attempt to demonstrate every possible combination of options.

dbms_aq.dequeue_array

Having enqueued 7,500 messages in the above examples, we can now dequeue them in batches. We will simply dequeue all messages in batches of up to 500 and write them to a message table. When there are no messages left, we will exit. We can determine whether there are any messages left to dequeue and process in one of two ways:

The following example includes the code required for both scenarios (for information), but it is scenario two that we specifically need to cater for. As stated, the dequeued messages will be written to a message table, which is created as follows.

SQL> CREATE TABLE demo_aq_array_mt
  2  ( message VARCHAR2(4000)
  3  , ts      TIMESTAMP(3)
  4  );

Table created.

Now we can dequeue the messages in batches of 500.

SQL> DECLARE
  2  
  3     r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
  4     nt_msg_properties DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T;
  5     nt_payloads       demo_aq_array_ntt;
  6     nt_msg_ids        DBMS_AQ.MSGID_ARRAY_T;
  7     v_dequeued_cnt    PLS_INTEGER;
  8     v_dequeue_batch   PLS_INTEGER := 500;
  9     v_continue        BOOLEAN := TRUE;
 10  
 11     x_timeout EXCEPTION;
 12     PRAGMA EXCEPTION_INIT(x_timeout, -25228);
 13  
 14  BEGIN
 15  
 16     /* Prepare collections... */
 17     nt_payloads := demo_aq_array_ntt();
 18     nt_payloads.EXTEND(v_dequeue_batch);
 19     nt_msg_properties := DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T();
 20     nt_msg_properties.EXTEND(v_dequeue_batch);
 21     nt_msg_ids := DBMS_AQ.MSGID_ARRAY_T();
 22  
 23     /* Set a 5 second timeout for the dequeue... */
 24     r_dequeue_options.wait := 5;
 25  
 26     /* Dequeue all messages in batches... */
 27     WHILE v_continue LOOP
 28  
 29        BEGIN
 30  
 31           v_dequeued_cnt := DBMS_AQ.DEQUEUE_ARRAY(
 32                                queue_name               => 'demo_aq_array_q',
 33                                dequeue_options          => r_dequeue_options,
 34                                array_size               => v_dequeue_batch,
 35                                message_properties_array => nt_msg_properties,
 36                                payload_array            => nt_payloads,
 37                                msgid_array              => nt_msg_ids
 38                                );
 39  
 40           DBMS_OUTPUT.PUT_LINE(
 41              'Dequeued [' || TO_CHAR(v_dequeued_cnt) || '] messages.'
 42              );
 43  
 44           INSERT INTO demo_aq_array_mt
 45           SELECT nt.message, SYSTIMESTAMP
 46           FROM   TABLE(nt_payloads) nt;
 47  
 48           DBMS_OUTPUT.PUT_LINE(
 49              'Inserted [' || TO_CHAR(SQL%ROWCOUNT) || '] messages.'
 50              );
 51  
 52           /* Handle exit scenario one from the notes above... */
 53           v_continue := (v_dequeued_cnt = v_dequeue_batch);
 54  
 55           /* Commit the batch... */
 56           COMMIT;
 57  
 58        EXCEPTION
 59  
 60           /* Handle exit scenario two from the notes above... */
 61           WHEN x_timeout THEN
 62              DBMS_OUTPUT.PUT_LINE(
 63                 'No more messages to dequeue.'
 64                 );
 65              v_continue := FALSE;
 66  
 67        END;
 68  
 69     END LOOP;
 70  
 71  END;
 72  /
Dequeued [500] messages.
Inserted [500] messages.
Dequeued [500] messages.
Inserted [500] messages.
Dequeued [500] messages.
Inserted [500] messages.
Dequeued [500] messages.
Inserted [500] messages.
Dequeued [500] messages.
Inserted [500] messages.
Dequeued [500] messages.
Inserted [500] messages.
Dequeued [500] messages.
Inserted [500] messages.
Dequeued [500] messages.
Inserted [500] messages.
Dequeued [500] messages.
Inserted [500] messages.
Dequeued [500] messages.
Inserted [500] messages.
Dequeued [500] messages.
Inserted [500] messages.
Dequeued [500] messages.
Inserted [500] messages.
Dequeued [500] messages.
Inserted [500] messages.
Dequeued [500] messages.
Inserted [500] messages.
Dequeued [500] messages.
Inserted [500] messages.
No more messages to dequeue.

PL/SQL procedure successfully completed.

We have now dequeued our example messages and written them to the message table. Out of interest, we can verify the number of messages dequeued and see the elapsed time as follows.

SQL> SELECT COUNT(*)
  2  ,      MIN(ts) AS first_batch_written
  3  ,      MAX(ts) AS last_batch_written
  4  FROM   demo_aq_array_mt;

  COUNT(*) FIRST_BATCH_WRITTEN        LAST_BATCH_WRITTEN
---------- -------------------------- --------------------------
      7500 29-JUL-05 21.28.01.031     29-JUL-05 21.28.05.671

1 row selected.

This is a simple confirmation that we collected all of our messages and the entire operation took approximately 5 seconds.

The dequeue example was designed to demonstrate the new array feature. Note, however, that dequeuing in batches would possibly be better combined with notification (via callback procedure), rather than managed by a standalone unit. See this introduction to AQ for an example of a callback procedure that automatically dequeues messages via notification. As a strategy note, a callback procedure that dequeues every message from a queue might hit redundant notifications. This is because each enqueue operation will trigger a notification but the first execution of the callback procedure will potentially dequeue everything. This exception (ORA-01002) will need to be handled.

dequeue timings

As with the enqueue examples, we can compare the array method against the wall-clock. We will enqueue 7,500 messages but this time, dequeue them using the three methods we adopted for the enqueue example:

First we need to enqueue 7,500 messages.

SQL> DECLARE
  2
  3     r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  4     r_msg_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
  5     nt_msg_properties DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T;
  6     nt_payloads       demo_aq_array_ntt;
  7     nt_msg_ids        DBMS_AQ.MSGID_ARRAY_T;
  8     v_enqueued_cnt    PLS_INTEGER;
  9
 10  BEGIN
 11
 12     /* Initialise the collections... */
 13     nt_payloads := demo_aq_array_ntt();
 14     nt_msg_properties := DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T();
 15     nt_msg_ids := DBMS_AQ.MSGID_ARRAY_T();
 16
 17     /* Load up 7500 arbitrary messages... */
 18     FOR i IN 1 .. 7500 LOOP
 19
 20        nt_payloads.EXTEND;
 21        nt_payloads(nt_payloads.LAST) := demo_aq_array_ot(
 22                                            'Message number ' || TO_CHAR(i)
 23                                            );
 24
 25        nt_msg_properties.EXTEND;
 26        nt_msg_properties(nt_msg_properties.LAST) := r_msg_properties;
 27
 28     END LOOP;
 29
 30     /* Enqueue the messages in a single transaction... */
 31     v_enqueued_cnt := DBMS_AQ.ENQUEUE_ARRAY(
 32                          queue_name               => 'demo_aq_array_q',
 33                          enqueue_options          => r_enqueue_options,
 34                          array_size               => nt_payloads.COUNT,
 35                          message_properties_array => nt_msg_properties,
 36                          payload_array            => nt_payloads,
 37                          msgid_array              => nt_msg_ids
 38                          );
 39     COMMIT;
 40
 41     DBMS_OUTPUT.PUT_LINE(
 42        'Enqueued [' || TO_CHAR(v_enqueued_cnt) || '] messages.'
 43        );
 44
 45  END;
 46  /
Enqueued [7500] messages.

PL/SQL procedure successfully completed.

Now we can dequeue 2,500 messages in a single array. Note that we do not do anything with these messages as this is a wall-clock timing of the dequeue operation itself.

SQL> set timing on

SQL> DECLARE
  2
  3     r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
  4     nt_msg_properties DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T;
  5     nt_payloads       demo_aq_array_ntt;
  6     nt_msg_ids        DBMS_AQ.MSGID_ARRAY_T;
  7     v_dequeued_cnt    PLS_INTEGER;
  8     v_dequeue_batch   PLS_INTEGER := 2500;
  9
 10  BEGIN
 11
 12     /* Prepare collections... */
 13     nt_payloads := demo_aq_array_ntt();
 14     nt_payloads.EXTEND(v_dequeue_batch);
 15     nt_msg_properties := DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T();
 16     nt_msg_properties.EXTEND(v_dequeue_batch);
 17     nt_msg_ids := DBMS_AQ.MSGID_ARRAY_T();
 18
 19     /* Dequeue 2,500 messages... */
 20     v_dequeued_cnt := DBMS_AQ.DEQUEUE_ARRAY(
 21                          queue_name               => 'demo_aq_array_q',
 22                          dequeue_options          => r_dequeue_options,
 23                          array_size               => v_dequeue_batch,
 24                          message_properties_array => nt_msg_properties,
 25                          payload_array            => nt_payloads,
 26                          msgid_array              => nt_msg_ids
 27                          );
 28
 29     DBMS_OUTPUT.PUT_LINE(
 30        'Dequeued [' || TO_CHAR(v_dequeued_cnt) || '] messages.'
 31        );
 32
 33     COMMIT;
 34
 35  END;
 36  /
Dequeued [2500] messages.

PL/SQL procedure successfully completed.

Elapsed: 00:00:00.89

Now we have our baseline timing, we can dequeue a further 2,500 messages but using the single-message DEQUEUE procedure. We'll only commit when all messages have been dequeued.

SQL> set timing on

SQL> DECLARE
  2
  3     r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
  4     r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  5     v_message_handle     RAW(16);
  6     o_payload            demo_aq_array_ot;
  7
  8  BEGIN
  9
 10     FOR i IN 1 .. 2500 LOOP
 11
 12        DBMS_AQ.DEQUEUE(
 13           queue_name         => 'demo_aq_array_q',
 14           dequeue_options    => r_dequeue_options,
 15           message_properties => r_message_properties,
 16           payload            => o_payload,
 17           msgid              => v_message_handle
 18           );
 19
 20     END LOOP;
 21
 22     COMMIT;
 23
 24  END;
 25  /

PL/SQL procedure successfully completed.

Elapsed: 00:00:01.21

This is approximately 50% slower than the array-based dequeue. Finally, we can dequeue the remaining messages but commit each time we collect a message.

SQL> set timing on

SQL> DECLARE
  2
  3     r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
  4     r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  5     v_message_handle     RAW(16);
  6     o_payload            demo_aq_array_ot;
  7
  8  BEGIN
  9
 10     FOR i IN 1 .. 2500 LOOP
 11
 12        DBMS_AQ.DEQUEUE(
 13           queue_name         => 'demo_aq_array_q',
 14           dequeue_options    => r_dequeue_options,
 15           message_properties => r_message_properties,
 16           payload            => o_payload,
 17           msgid              => v_message_handle
 18           );
 19
 20        COMMIT;
 21
 22     END LOOP;
 23
 24  END;
 25  /

PL/SQL procedure successfully completed.

Elapsed: 00:00:01.62

This method takes approximately twice as long as the array dequeue method. The overall time comparisons are similar to those of the enqueue examples above.

array does not imply bulk

Our understanding of array-based processing is that it is more efficient; in fact, we can see improved elapsed times in our enqueue and dequeue examples. However, if we run our ENQUEUE_ARRAY and DEQUEUE_ARRAY examples with SQL trace enabled, we see something surprising. The following TKProf output shows the ENQUEUE_ARRAY example run at level 12 SQL trace (event 10046).


insert into "SCOTT"."DEMO_AQ_ARRAY_QT"  (q_name, msgid, corrid, priority, 
  state, delay, expiration,   time_manager_info, local_order_no, chain_no, 
  enq_time, step_no, enq_uid,   enq_tid, retry_count, exception_qschema, 
  exception_queue, recipient_key,   dequeue_msgid, user_data, sender_name, 
  sender_address, sender_protocol,   user_prop, cscn, dscn)   
values
 (:1, :2, :3, :4, :5, :6, :7, :8, :9, :10, :11, :12, :13, :14, 0, :15,        
   :16, :17, :18, :19, :20, :21, :22, :23, :24, :25) returning rowid into :rd

call     count       cpu    elapsed       disk      query    current        rows
------- ------  -------- ---------- ---------- ---------- ----------  ----------
Parse        1      0.00       0.00          0          0          0           0
Execute      1      0.62       0.74         13         84      13371        2500
Fetch        0      0.00       0.00          0          0          0           0
------- ------  -------- ---------- ---------- ---------- ----------  ----------
total        2      0.62       0.74         13         84      13371        2500

Misses in library cache during parse: 1
Misses in library cache during execute: 1
Optimizer mode: CHOOSE
Parsing user id: SYS   (recursive depth: 1)

We can see that our entire message array was inserted into the queue table in a single insert, which is what we expect. However, if we now look at the dequeue trace, we might be surprised.


delete from "SCOTT"."DEMO_AQ_ARRAY_QT" 
where
 rowid = :1

call     count       cpu    elapsed       disk      query    current        rows
------- ------  -------- ---------- ---------- ---------- ----------  ----------
Parse        0      0.00       0.00          0          0          0           0
Execute   2500      0.32       0.27          0       2501      13093        2500
Fetch        0      0.00       0.00          0          0          0           0
------- ------  -------- ---------- ---------- ---------- ----------  ----------
total     2500      0.32       0.27          0       2501      13093        2500

Misses in library cache during parse: 0
Optimizer mode: CHOOSE
Parsing user id: SYS   (recursive depth: 1)

This time there is no bulk operation at all. Each message in our array was dequeued individually, as proven by the 2,500 executes of this delete statement. Furthermore, the following select statement precedes the delete, which tells us that Oracle is fetching the data off the queue (i.e. the queue table) first so it can lock it for delete. It fetches the data in bulk, but it is not using subsequent array-based deletes.


select  /*+ INDEX(TAB) */   tab.rowid, tab.msgid, tab.corrid, tab.priority, 
  tab.delay,   tab.expiration, tab.retry_count, tab.exception_qschema,   
  tab.exception_queue, tab.chain_no, tab.local_order_no, tab.enq_time,   
  tab.time_manager_info, tab.state, tab.enq_tid, tab.step_no,   
  tab.sender_name, tab.sender_address, tab.sender_protocol,   
  tab.dequeue_msgid, tab.user_prop, tab.cscn, tab.dscn, tab.user_data  
from
 "SCOTT"."DEMO_AQ_ARRAY_QT" tab  where q_name = :1 and state = :2  order by 
  q_name, state, enq_time, step_no, chain_no, local_order_no  for update skip 
  locked

call     count       cpu    elapsed       disk      query    current        rows
------- ------  -------- ---------- ---------- ---------- ----------  ----------
Parse        0      0.00       0.00          0          0          0           0
Execute      4      0.00       0.00          0          0          0           0
Fetch     2504      0.23       0.20          0       5187       2510        2500
------- ------  -------- ---------- ---------- ---------- ----------  ----------
total     2508      0.23       0.20          0       5187       2510        2500

Misses in library cache during parse: 0
Optimizer mode: CHOOSE
Parsing user id: SYS   (recursive depth: 1)

This is one to look out for in future releases.

further reading

For general information on Advanced Queuing, refer to the Advanced Queuing Guide. For information on the new ENQUEUE_ARRAY and DEQUEUE_ARRAY functions described in this article, read the DBMS_AQ reference.

acknowledgements

Credit goes to Job Miller for pointing out the underlying behaviour of the DEQUEUE_ARRAY function.

source code

The source code for the examples in this article can be downloaded from here.

Adrian Billington, July 2005

Back to Top