Sunday, April 6, 2014

Analysing Parallel Execution Skew - Without Diagnostics / Tuning Pack License

This is the third part of the video tutorial "Analysing Parallel Execution Skew". In this part I show how to analyse a parallel SQL execution regarding Parallel Execution Skew.

If you don't have a Diagnostics / Tuning Pack license the options you have for doing that are quite limited, and the approach, as demonstrated in the tutorial, has several limitations and shortcomings.

Here is a link to the video on my Youtube channel.

If you want to reproduce or play around with the examples shown in the tutorial here is the script for creating the tables and running the queries / DML commands used in the tutorial. A shout goes out to Christo Kutrovsky at Pythian who I think was the one who inspired the beautified version on V$PQ_TQSTAT.

---------------------
-- Links for S-ASH --
---------------------
--
-- http://www.perfvision.com/ash.php
-- http://www.pythian.com/blog/trying-out-s-ash/
-- http://sourceforge.net/projects/orasash/files/v2.3/
-- http://sourceforge.net/projects/ashv/
---------------------

-- Table creation
set echo on timing on time on

drop table t_1;

purge table t_1;

drop table t_2;

purge table t_2;

drop table t_1_part;

purge table t_1_part;

drop table t_2_part;

purge table t_2_part;

drop table t1;

purge table t1;

drop table t2;

purge table t2;

drop table t3;

purge table t3;

drop table t4;

purge table t4;

drop table t5;

purge table t5;

drop table x;

purge table x;

create table t1
as
select  /*+ use_nl(a b) */
        (rownum * 2) as id
      , rownum as id2
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1000) */ * from dual
connect by
        level <= 1000) a, (select /*+ cardinality(2) */ * from dual connect by level <= 2) b
;

exec dbms_stats.gather_table_stats(null, 't1')

alter table t1 cache;

create table t2
compress
as
select
        (rownum * 2) + 1 as id
      , mod(rownum, 2000) + 1 as id2
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1000000) */ * from dual
connect by
        level <= 1000000) a, (select /*+ cardinality(2) */ * from dual connect by level <= 2) b
;

exec dbms_stats.gather_table_stats(null, 't2')

alter table t2 cache;

create table t3
as
select  /*+ use_nl(a b) */
        (rownum * 2) as id
      , rownum as id2
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1000) */ * from dual
connect by
        level <= 1000) a, (select /*+ cardinality(2) */ * from dual connect by level <= 2) b
;

exec dbms_stats.gather_table_stats(null, 't3')

alter table t3 cache;

create table t4
compress
as
select
        (rownum * 2) + 1 as id
      , mod(rownum, 2000) + 1 as id2
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1000000) */ * from dual
connect by
        level <= 1000000) a, (select /*+ cardinality(2) */ * from dual connect by level <= 2) b
;

exec dbms_stats.gather_table_stats(null, 't4')

alter table t4 cache;

create table t5
as
select  /*+ use_nl(a b) */
        (rownum * 2) as id
      , rownum as id2
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1000) */ * from dual
connect by
        level <= 1000) a, (select /*+ cardinality(2) */ * from dual connect by level <= 2) b
;

exec dbms_stats.gather_table_stats(null, 't5')

alter table t5 cache;

create table x
compress
as
select * from t2
where 1 = 2;

create unique index x_idx1 on x (id);

alter table t1 parallel 2;

alter table t2 parallel 2;

alter table t3 parallel 15;

alter table t4 parallel 15;

alter table t5 parallel 15;

create table t_1
compress
as
select  /*+ use_nl(a b) */
        rownum as id
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1e5) */ * from dual
connect by
        level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b
;

exec dbms_stats.gather_table_stats(null, 't_1')

create table t_2
compress
as
select
        rownum as id
      , case when rownum <= 5e5 then mod(rownum, 2e6) + 1 else 1 end as fk_id_skew
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1e5) */ * from dual
connect by
        level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b
;

exec dbms_stats.gather_table_stats(null, 't_2', method_opt=>'for all columns size 1', no_invalidate=>false)

alter table t_1 parallel 8 cache;

alter table t_2 parallel 8 cache;

create table t_1_part
partition by hash(id) partitions 8
compress
as
select  /*+ use_nl(a b) */
        rownum as id
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1e5) */ * from dual
connect by
        level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b
;

exec dbms_stats.gather_table_stats(null, 't_1_part')

create table t_2_part
partition by hash(fk_id_skew) partitions 8
compress
as
select
        rownum as id
      , case when rownum <= 5e5 then mod(rownum, 2e6) + 1 else 1 end as fk_id_skew
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1e5) */ * from dual
connect by
        level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b
;

exec dbms_stats.gather_table_stats(null, 't_2_part', method_opt=>'for all columns size 1', no_invalidate=>false)

alter table t_1_part parallel 8 cache;

alter table t_2_part parallel 8 cache;

---------------------------------------------------------------
-- Single DFO tree (with Parallel Execution Skew), many DFOs --
---------------------------------------------------------------

set echo on timing on time on verify on

define num_cpu = "14"

alter session set workarea_size_policy = manual;

alter session set sort_area_size = 200000000;

alter session set sort_area_size = 200000000;

alter session set hash_area_size = 200000000;

alter session set hash_area_size = 200000000;

select
        max(t1_id)
      , max(t1_filler)
      , max(t2_id)
      , max(t2_filler)
      , max(t3_id)
      , max(t3_filler)
from    (
          select  /*+ monitor
                     no_merge
                     no_merge(v_1)
                     no_merge(v_5)
                     parallel(t1 &num_cpu)
                     PQ_DISTRIBUTE(T1 HASH HASH)
                     PQ_DISTRIBUTE(V_5 HASH HASH)
                     leading (v_1 v_5 t1)
                     use_hash(v_1 v_5 t1)
                     swap_join_inputs(t1)
                  */
                  t1.id     as t1_id
                , regexp_replace(v_5.t3_filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') as t1_filler
                , v_5.*
          from    (
                    select  /*+ parallel(t2 &num_cpu)
                                parallel(t3 &num_cpu)
                                leading(t3 t2)
                                use_hash(t3 t2)
                                swap_join_inputs(t2)
                                PQ_DISTRIBUTE(T2 HASH HASH)
                            */
                            t2.id     as t2_id
                          , t2.filler as t2_filler
                          , t2.id2    as t2_id2
                          , t3.id     as t3_id
                          , t3.filler as t3_filler
                    from
                            t1 t2
                          , t2 t3
                    where
                            t3.id2 = t2.id2 (+)
                    and     regexp_replace(t3.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
                    and     mod(t3.id2, 3) = 0
                  ) v_1
                , (
                    select  /*+ parallel(t2 &num_cpu)
                                parallel(t3 &num_cpu)
                                leading(t3 t2)
                                use_hash(t3 t2)
                                swap_join_inputs(t2)
                                PQ_DISTRIBUTE(T2 HASH HASH)
                            */
                            t2.id     as t2_id
                          , t2.filler as t2_filler
                          , t2.id2    as t2_id2
                          , t3.id     as t3_id
                          , t3.filler as t3_filler
                    from
                            t1 t2
                          , t2 t3
                    where
                            t3.id = t2.id (+)
                    and     regexp_replace(t3.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
                    and     mod(t3.id2, 3) = 0
                  ) v_5
                , t1
          where
                  v_1.t3_id = v_5.t3_id
          and     v_5.t2_id2 = t1.id2 (+) + 2001
          and     regexp_replace(v_5.t3_filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t1.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
        )
;

break on dfo_number nodup on tq_id nodup on server_type skip 1 nodup on instance nodup

-- compute sum label Total of num_rows on server_type

select
        /*dfo_number
      , */tq_id
      , cast(server_type as varchar2(10)) as server_type
      , instance
      , cast(process as varchar2(8)) as process
      , num_rows
      , round(ratio_to_report(num_rows) over (partition by dfo_number, tq_id, server_type) * 100) as "%"
      , cast(rpad('#', round(num_rows * 10 / nullif(max(num_rows) over (partition by dfo_number, tq_id, server_type), 0)), '#') as varchar2(10)) as graph
      , round(bytes / 1024 / 1024) as mb
      , round(bytes / nullif(num_rows, 0)) as "bytes/row"
from
        v$pq_tqstat
order by
        dfo_number
      , tq_id
      , server_type desc
      , instance
      , process
;

---------------------------------------------------------------------------------------------------
-- Same statement with Parallel TEMP TABLE TRANSFORMATION, V$PQ_TQSTAT shows useless information --
---------------------------------------------------------------------------------------------------

set echo on timing on time on verify on

define num_cpu = "14"

alter session set workarea_size_policy = manual;

alter session set sort_area_size = 200000000;

alter session set sort_area_size = 200000000;

alter session set hash_area_size = 200000000;

alter session set hash_area_size = 200000000;

with result as
(
select /*+ materialize
           monitor
           no_merge
           no_merge(v_1)
           no_merge(v_5)
           parallel(t1 &num_cpu)
           PQ_DISTRIBUTE(T1 HASH HASH)
           PQ_DISTRIBUTE(V_1 HASH HASH)
           PQ_DISTRIBUTE(V_5 HASH HASH)
           leading (v_1 v_5 t1)
           use_hash(v_1 v_5 t1)
           swap_join_inputs(t1)
       */
       t1.id     as t1_id
     , regexp_replace(v_5.t3_filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') as t1_filler
     , v_5.*
from   (
  select /*+ parallel(t2 &num_cpu) parallel(t3 &num_cpu) leading(t3 t2) use_hash(t3 t2) swap_join_inputs(t2) PQ_DISTRIBUTE(T2 HASH HASH) */
        t2.id     as t2_id
      , t2.filler as t2_filler
      , t2.id2    as t2_id2
      , t3.id     as t3_id
      , t3.filler as t3_filler
  from
        t1 t2
      , t2 t3
  where
        t3.id2 = t2.id2 (+)
  and   regexp_replace(t3.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
  and   mod(t3.id2, 3) = 0
)
 v_1
     , (
  select /*+ parallel(t2 &num_cpu) parallel(t3 &num_cpu) leading(t3 t2) use_hash(t3 t2) swap_join_inputs(t2) PQ_DISTRIBUTE(T2 HASH HASH) */
        t2.id     as t2_id
      , t2.filler as t2_filler
      , t2.id2    as t2_id2
      , t3.id     as t3_id
      , t3.filler as t3_filler
  from
        t1 t2
      , t2 t3
  where
        t3.id = t2.id (+)
  and   regexp_replace(t3.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
  and   mod(t3.id2, 3) = 0
) v_5
     , t1
where
       v_1.t3_id = v_5.t3_id
and    v_5.t2_id2 = t1.id2 (+) + 2001
and    regexp_replace(v_5.t3_filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t1.filler (+), '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
)
select max(t1_id), max(t1_filler), max(t2_id), max(t2_filler), max(t3_id), max(t3_filler) from
result;

break on dfo_number nodup on tq_id nodup on server_type skip 1 nodup on instance nodup

-- compute sum label Total of num_rows on server_type

select
        /*dfo_number
      , */tq_id
      , cast(server_type as varchar2(10)) as server_type
      , instance
      , cast(process as varchar2(8)) as process
      , num_rows
      , round(ratio_to_report(num_rows) over (partition by dfo_number, tq_id, server_type) * 100) as "%"
      , cast(rpad('#', round(num_rows * 10 / nullif(max(num_rows) over (partition by dfo_number, tq_id, server_type), 0)), '#') as varchar2(10)) as graph
      , round(bytes / 1024 / 1024) as mb
      , round(bytes / nullif(num_rows, 0)) as "bytes/row"
from
        v$pq_tqstat
order by
        dfo_number
      , tq_id
      , server_type desc
      , instance
      , process
;

--------------------------------------------------------------------------------------------------
-- This construct results in misleading information from V$PQ_TQSTAT (actually a complete mess) --
--------------------------------------------------------------------------------------------------

set echo on timing on time on

alter session enable parallel dml;

truncate table x;

insert  /*+ append parallel(x 4) */ into x
select  /*+ leading(v1 v2) optimizer_features_enable('11.2.0.1') */
        v_1.id
      , v_1.id2
      , v_1.filler
from    (
           select
                   id
                 , id2
                 , filler
           from    (
                      select  /*+ parallel(t2 4) no_merge */
                              rownum as id
                            , t2.id2
                            , t2.filler
                      from
                              t2
                      where
                              mod(t2.id2, 3) = 0
                      and     regexp_replace(t2.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'i')
                   ) v1
        ) v_1
      , (
          select
                   id
                 , id2
                 , filler
          from     (
                     select  /*+ parallel(t2 8) no_merge */
                             rownum as id
                           , t2.id2
                           , t2.filler
                     from
                             t2
                     where
                             mod(t2.id2, 3) = 0
                     and     regexp_replace(t2.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') = regexp_replace(t2.filler, '^([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'i')
                   ) v2
        ) v_2
where
        v_1.id = v_2.id
and     v_1.filler = v_2.filler
;

-- Parallel DML requires a COMMIT before querying V$PQ_TQSTAT
commit;

break on dfo_number nodup on tq_id nodup on server_type skip 1 nodup on instance nodup

compute sum label Total of num_rows on server_type

select
        dfo_number
      , tq_id
      , cast(server_type as varchar2(10)) as server_type
      , instance
      , cast(process as varchar2(8)) as process
      , num_rows
      , round(ratio_to_report(num_rows) over (partition by dfo_number, tq_id, server_type) * 100) as "%"
      , cast(rpad('#', round(num_rows * 10 / nullif(max(num_rows) over (partition by dfo_number, tq_id, server_type), 0)), '#') as varchar2(10)) as graph
      , round(bytes / 1024 / 1024) as mb
      , round(bytes / nullif(num_rows, 0)) as "bytes/row"
from
        v$pq_tqstat
order by
        dfo_number
      , tq_id
      , server_type desc
      , instance
      , process
;

----------------------------------------------------------------------
-- Single DFO tree (with Parallel Execution Skew, almost no impact) --
----------------------------------------------------------------------

set echo on timing on time on

alter session set workarea_size_policy = manual;

alter session set sort_area_size = 500000000;

alter session set sort_area_size = 500000000;

alter session set hash_area_size = 500000000;

alter session set hash_area_size = 500000000;

select  /*+ leading(v1)
            use_hash(t_1)
            no_swap_join_inputs(t_1)
            pq_distribute(t_1 hash hash)
        */
        max(t_1.filler)
      , max(v1.t_1_filler)
      , max(v1.t_2_filler)
from
        t_1
      , (
          select  /*+ no_merge
                      leading(t_1 t_2)
                      use_hash(t_2)
                      no_swap_join_inputs(t_2)
                      pq_distribute(t_2 hash hash) */
                  t_1.id as t_1_id
                , t_1.filler as t_1_filler
                , t_2.id as t_2_id
                , t_2.filler as t_2_filler
          from    t_1
                , t_2
          where
                  t_2.fk_id_skew = t_1.id
        ) v1
where
        v1.t_2_id = t_1.id
and     regexp_replace(v1.t_2_filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') >= regexp_replace(t_1.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
and     regexp_replace(v1.t_2_filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'i') >= regexp_replace(t_1.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'i')
;

break on dfo_number nodup on tq_id nodup on server_type skip 1 nodup on instance nodup

-- compute sum label Total of num_rows on server_type

select
        /*dfo_number
      , */tq_id
      , cast(server_type as varchar2(10)) as server_type
      , instance
      , cast(process as varchar2(8)) as process
      , num_rows
      , round(ratio_to_report(num_rows) over (partition by dfo_number, tq_id, server_type) * 100) as "%"
      , cast(rpad('#', round(num_rows * 10 / nullif(max(num_rows) over (partition by dfo_number, tq_id, server_type), 0)), '#') as varchar2(10)) as graph
      , round(bytes / 1024 / 1024) as mb
      , round(bytes / nullif(num_rows, 0)) as "bytes/row"
from
        v$pq_tqstat
order by
        dfo_number
      , tq_id
      , server_type desc
      , instance
      , process
;

--------------------------------------------------------------------------------------------------------------------------------
-- Full Partition Wise Join with partition skew - V$PQ_TQSTAT is of no help, since no redistribution takes place (single DFO) --
--------------------------------------------------------------------------------------------------------------------------------

set echo on timing on time on

alter session set workarea_size_policy = manual;

alter session set sort_area_size = 500000000;

alter session set sort_area_size = 500000000;

alter session set hash_area_size = 500000000;

alter session set hash_area_size = 500000000;

select count(t_2_filler) from (
select  /*+ monitor
            leading(t_1 t_2)
            use_hash(t_2)
            no_swap_join_inputs(t_2)
            pq_distribute(t_2 none none)
        */
        t_1.id as t_1_id
      , t_1.filler as t_1_filler
      , t_2.id as t_2_id
      , t_2.filler as t_2_filler
from    t_1_part t_1
      , t_2_part t_2
where
        t_2.fk_id_skew = t_1.id
and     regexp_replace(t_2.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') >= regexp_replace(t_1.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
);

break on dfo_number nodup on tq_id nodup on server_type skip 1 nodup on instance nodup

-- compute sum label Total of num_rows on server_type

select
        /*dfo_number
      , */tq_id
      , cast(server_type as varchar2(10)) as server_type
      , instance
      , cast(process as varchar2(8)) as process
      , num_rows
      , round(ratio_to_report(num_rows) over (partition by dfo_number, tq_id, server_type) * 100) as "%"
      , cast(rpad('#', round(num_rows * 10 / nullif(max(num_rows) over (partition by dfo_number, tq_id, server_type), 0)), '#') as varchar2(10)) as graph
      , round(bytes / 1024 / 1024) as mb
      , round(bytes / nullif(num_rows, 0)) as "bytes/row"
from
        v$pq_tqstat
order by
        dfo_number
      , tq_id
      , server_type desc
      , instance
      , process
;