oracle的并行管道函数

并行管道函数
这个例子中要使用两个表:T1和T2。T1是先读的表,T2表用来插入这个信息。我们要用的两个表如下:

sys@JINGYONG> create table t1
  2  as
  3  select object_id id,object_name text
  4  from all_objects;

表已创建。

sys@JINGYONG> begin
  2  dbms_stats.set_table_stats
  3  (user,'T1',numrows=>100000000,numblks=>100000);
  4  end;
  5  /

PL/SQL 过程已成功完成。

sys@JINGYONG> create table t2
  2  as
  3  select t1.*,0 session_id
  4  from t1
  5  where 1=0;

表已创建。

这里使用DBMS_STATS来骗过优化器,让它以为输入表中有10,000,000行,而且占用了100,000个数据库块。在此模拟 一个大表。第二个表T2是第一个表的一个副本,只是在结构中增加了一个SESSION_ID列。可以通过它具体看到是否发生了并行化。接下来,需要建立管道函数返回的对象类型。在这个例子中,对象类型类似于T2:

sys@JINGYONG> create or replace type t2_type
  2  as object
  3  (
  4  id number,
  5  text varchar2(30),
  6  session_id number
  7  );
  8  /

类型已创建。

sys@JINGYONG> create or replace type t2_tab_type as table of t2_type;
  2  /

类型已创建。

现在这个过程是一个生成行的函数。它接收数据作为输入,并在一个引用游标(ref cursor)中处理。这个函数返回一个 T2_TAB_TYPE,这就是我们刚才创建的对象类型。这是一个PARALLEL_ENABLED(启用子并行)的管道函数。在此使用了分区 (partition)子句,这就告诉Oracle:以任何最合适的方式划分或分解数据。我们不需要对数据的顺序做任何假设。

在此,我们只想划分数据。数据如何划分对于我们的处理并不重要,所以定义如下:

sys@JINGYONG> create or replace function parallel_pipelined(l_cursor in sys_refcursor)
  2  return t2_tab_type
  3  pipelined
  4  parallel_enable(partition l_cursor by any)
  5  is
  6   l_session_id number;
  7   TYPE type_t1_data IS TABLE OF t1%ROWTYPE INDEX BY PLS_INTEGER;
  8   l_t1  type_t1_data;
  9
 10  begin
 11  select sid into l_session_id
 12  from v$mystat
 13  where rownum=1;
 14  loop
 15    fetch l_cursor bulk collect into l_t1;--用bulk collect来一次性获取数据
 16    exit when l_t1.count=0;
 17    for i in 1 .. l_t1.count loop
 18          pipe row(t2_type(l_t1(i).id,l_t1(i).text,l_session_id));
 19    end loop;
 20    null;
 21  end loop;
 22  close l_cursor;
 23  return;
 24  end;
 25  /

Function created

或者用下面的过程来一行一行来获取

create or replace
function parallel_pipelined( l_cursor in sys_refcursor )
return t2_tab_type
pipelined
parallel_enable ( partition l_cursor by any )
is
 l_session_id number;
 l_rec t1%rowtype;
begin
 select sid into l_session_id
 from v$mystat
 where rownum =1;
 loop
 fetch l_cursor into l_rec;
 exit when l_cursor%notfound;
 pipe row(t2_type(l_rec.id,l_rec.text,l_session_id));
 end loop;
 close l_cursor;
 return;
end;

这样就创建了函数。我们准备并行地处理数据,让Oracle根据可用的资源来确定最合适的并行度:

SQL> insert /*+ append */
  2  into t2(id,text,session_id)
  3   select *
  4   from table(parallel_pipelined
  5   (CURSOR(select /*+ parallel(t1) */ *
  6   from t1 )
  7  ))
  8  ;

50333 rows inserted

SQL> commit;

Commit complete

为了查看这里发生了什么,可以查询新插入的数据,并按SESSION_ID分组,先来看使用了多少个并行执行服务器,再看每个并行 执行服务器处理了多少行:

SQL> select session_id,count(*) from t2 group by session_id;

SESSION_ID   COUNT(*)
---------- ----------
       136      31006
       145      19327

显然,对于这个并行操作的SELECT部分,我们使用了2个并行执行服务器,可以看到,Oracle对我们的过程进行了并行化

发表评论

电子邮件地址不会被公开。