Friday, November 29, 2013

Accelerate skewed joins

Sponsored by PRISE Ltd.
www.prisetools.com

How to "re-parallelize" skewed joins

Case description

Assume that we have 1M customers, 4M transactions and our top customer produce the 2.5% of all transactions.Others produce the remaining 97.5% of transactions approx. evenly.
Scroll down to the bottom of the post for sample table and data generator SQL.

Our task is to join a "Customer" and a "Transaction" tables on Customer_id.

The join

SELECT Customer_name, count(*)
FROM Customer c
JOIN Transact t ON c.Customer_id = t.Customer_id
GROUP BY 1;


We experience a pretty slow execution.
On the ViewPoint we see that only one AMP is working, while others are not.

What is the problem?
There are two  separate subsets of the Transact table from "joinability" point of view:
  • "Peak" part (records of top customer(s))
    Very few customers have very much Transact records. Product join would be cost effective
  • "Even" part (records of other customers)
    Much customers have much, but specifically evenly few Transact records. Merge join would be ideal.
Unfortunately Optimizer have to decide, only one operation type can be chosen. It will choose merge join which consumes far less CPU time.

Execution plan looks like this:

 This query is optimized using type 2 profile T2_Linux64, profileid 21.
  1) First, we lock a distinct D_DB_TMP."pseudo table" for read on a
     RowHash to prevent global deadlock for D_DB_TMP.t.
  2) Next, we lock a distinct D_DB_TMP."pseudo table" for read on a
     RowHash to prevent global deadlock for D_DB_TMP.c.
  3) We lock D_DB_TMP.t for read, and we lock D_DB_TMP.c for read.
  4) We do an all-AMPs RETRIEVE step from D_DB_TMP.t by way of an
     all-rows scan with a condition of ("NOT (D_DB_TMP.t.Customer_ID IS
     NULL)") into Spool 4 (all_amps), which is redistributed by the
     hash code of (D_DB_TMP.t.Customer_ID) to all AMPs.  Then we do a
     SORT to order Spool 4 by row hash.  The size of Spool 4 is
     estimated with low confidence to be 125 rows (2,125 bytes).  The
     estimated time for this step is 0.01 seconds.
  5) We do an all-AMPs JOIN step from Spool 4 (Last Use) by way of a
     RowHash match scan, which is joined to D_DB_TMP.c by way of a
     RowHash match scan.  Spool 4 and D_DB_TMP.c are joined using a
     merge join, with a join condition of ("D_DB_TMP.c.Customer_ID =
     Customer_ID").  The result goes into Spool 3 (all_amps), which is
     built locally on the AMPs.  The size of Spool 3 is estimated with
     index join confidence to be 125 rows (10,375 bytes).  The
     estimated time for this step is 0.02 seconds.
  6) We do an all-AMPs SUM step to aggregate from Spool 3 (Last Use) by
     way of an all-rows scan , grouping by field1 (
     D_DB_TMP.c.Customer_name).  Aggregate Intermediate Results are
     computed globally, then placed in Spool 5.  The size of Spool 5 is
     estimated with no confidence to be 94 rows (14,758 bytes).  The
     estimated time for this step is 0.02 seconds.
  7) We do an all-AMPs RETRIEVE step from Spool 5 (Last Use) by way of
     an all-rows scan into Spool 1 (all_amps), which is built locally
     on the AMPs.  The size of Spool 1 is estimated with no confidence
     to be 94 rows (8,742 bytes).  The estimated time for this step is
     0.02 seconds.
  8) Finally, we send out an END TRANSACTION step to all AMPs involved
     in processing the request.
  -> The contents of Spool 1 are sent back to the user as the result of
     statement 1.  The total estimated time is 0.07 seconds.

How to identify


If you experience extremely asymmetric AMP load you can suspect on this case.
Find highly skewed JOIN steps in the DBQL (set all logging options on):

select top 50
a.MaxAMPCPUTime * (hashamp()+1) / nullifzero(a.CPUTime) Skw,a.CPUTime,a.MaxAMPCPUTime * (hashamp()+1) CoveringCPUTime,
b.*
from dbc.dbqlsteptbl a
join dbc.dbqlogtbl b on a.procid=b.procid and a.queryid=b.queryid
where
StepName='JIN'
and CPUtime > 100
and Skw > 2
order by CoveringCPUTime desc;




(Note: Covering CPU time is <No-of-AMPs> * <Max AMP's CPU time>. Virtually this amount of CPU is consumed because asymmetric load of the system)

Or if you suspect a specific query, check the demography of the join field(s) in the "big" table:

SELECT TOP 100 <Join_field>, count(*) Nbr
FROM <Big_table> GROUP BY 1 ORDER BY 2 DESC;


If the top occurences are spectacularly larger than others (or than average) the idea likely matches.


Solution

Break the query into two parts: join the top customer(s) separately, and then all others. Finally union the results. (Sometimes additional modification also required if the embedding operation(s) - the group by here - is/are not decomposable on the same parameter.)
First we have to identify the top customer(s):

SELECT TOP 5 Customer_id, count(*) Nbr
FROM Transact GROUP BY 1 ORDER BY 2 DESC;

Customer_id          Nbr
------------------------------
          345       100004
     499873                4
     677423                4
     187236                4
       23482                4
     
Replace the original query with his one:

SELECT Customer_name, count(*)
FROM Customer c
JOIN Transact t ON c.Customer_id = t.Customer_id
where t.Customer_id in (345)  

/*
   ID of the top Customer(s). 
   If more customers are salient, list them, but max ~5
*/
GROUP BY 1
UNION ALL
SELECT Customer_name, count(*)
FROM Customer c
JOIN Transact t ON c.Customer_id = t.Customer_id
where t.Customer_id not in (345)  -- Same customer(s)
GROUP BY 1
;

Be sure that Customer.Customer_id, Transact.Transact_id and Transact.Customer_id have statistics!

Rhis query is more complex, has more steps, scans Transact table 2 times, but runs much faster, you can check it.
But why? And how to determine which "top" customers worth to be handled separately?
Read ahead.

Explanation

Calculation


Let's do some maths:
Assume that we are on a 125 AMP system.
Customer table contains 1M records with unique ID.
We have ~4.1M records in the Transact table, 100k for the top customer (ID=345), and 4 for each other customers. This matches the 2.5% we assumed above.

If the  Transact table is redistributed on hash(Customer_id) then we will get ~33k records on each AMPs, excluding AMP(hash(345)). Here we'll get ~133k (33k + 100K).
That means that this AMP will process ~4x more data than others, therefore runs 4x longer.
With other words in 75% of this JOIN step's time 124 AMPs will DO NOTHING with the query.

Moreover the preparation and subsequent steps are problematic also: the JOIN is prepared by a redistribution which produces a strongly skewed spool, and the JOIN's result stays locally on the AMPs being skewed also.

Optimized version

This query will consume moderately more CPU, but it is distributed evenly across the AMPs, utilizing the Teradata's full parallel capability.
It contains a product join also, but is it no problem it joins 1 records to the selected 100k records of Transacts, that will be lightning fast.

All

Look at the execution plan of the broken-up query:


 This query is optimized using type 2 profile T2_Linux64, profileid 21.
  1) First, we lock a distinct D_DB_TMP."pseudo table" for read on a
     RowHash to prevent global deadlock for D_DB_TMP.t.
  2) Next, we lock a distinct D_DB_TMP."pseudo table" for read on a
     RowHash to prevent global deadlock for D_DB_TMP.c.
  3) We lock D_DB_TMP.t for read, and we lock D_DB_TMP.c for read.
  4) We do a single-AMP RETRIEVE step from D_DB_TMP.c by way of the
     unique primary index "D_DB_TMP.c.Customer_ID = 345" with no
     residual conditions into Spool 4 (all_amps), which is duplicated
     on all AMPs.  The size of Spool 4 is estimated with high
     confidence to be 125 rows (10,625 bytes).  The estimated time for
     this step is 0.01 seconds.
  5) We do an all-AMPs JOIN step from Spool 4 (Last Use) by way of an
     all-rows scan, which is joined to D_DB_TMP.t by way of an all-rows
     scan with a condition of ("D_DB_TMP.t.Customer_ID = 345").  Spool
     4 and D_DB_TMP.t are joined using a product join, with a join
     condition of ("Customer_ID = D_DB_TMP.t.Customer_ID").  The result
     goes into Spool 3 (all_amps), which is built locally on the AMPs.
     The size of Spool 3 is estimated with low confidence to be 99,670
     rows (8,272,610 bytes).  The estimated time for this step is 0.09
     seconds.
  6) We do an all-AMPs SUM step to aggregate from Spool 3 (Last Use) by
     way of an all-rows scan , grouping by field1 (
     D_DB_TMP.c.Customer_name).  Aggregate Intermediate Results are
     computed globally, then placed in Spool 5.  The size of Spool 5 is
     estimated with no confidence to be 74,753 rows (11,736,221 bytes).
     The estimated time for this step is 0.20 seconds.
  7) We execute the following steps in parallel.
       1) We do an all-AMPs RETRIEVE step from Spool 5 (Last Use) by
          way of an all-rows scan into Spool 1 (all_amps), which is
          built locally on the AMPs.  The size of Spool 1 is estimated
          with no confidence to be 74,753 rows (22,052,135 bytes).  The
          estimated time for this step is 0.02 seconds.
       2) We do an all-AMPs RETRIEVE step from D_DB_TMP.t by way of an
          all-rows scan with a condition of ("D_DB_TMP.t.Customer_ID <>
          3454") into Spool 9 (all_amps), which is redistributed by the
          hash code of (D_DB_TMP.t.Customer_ID) to all AMPs.  The size
          of Spool 9 is estimated with high confidence to be 4,294,230
          rows (73,001,910 bytes).  The estimated time for this step is
          1.80 seconds.
  8) We do an all-AMPs JOIN step from D_DB_TMP.c by way of an all-rows
     scan with a condition of ("D_DB_TMP.c.Customer_ID <> 3454"), which
     is joined to Spool 9 (Last Use) by way of an all-rows scan.
     D_DB_TMP.c and Spool 9 are joined using a single partition hash
     join, with a join condition of ("D_DB_TMP.c.Customer_ID =
     Customer_ID").  The result goes into Spool 8 (all_amps), which is
     built locally on the AMPs.  The size of Spool 8 is estimated with
     low confidence to be 4,294,230 rows (356,421,090 bytes).  The
     estimated time for this step is 0.72 seconds.
  9) We do an all-AMPs SUM step to aggregate from Spool 8 (Last Use) by
     way of an all-rows scan , grouping by field1 (
     D_DB_TMP.c.Customer_name).  Aggregate Intermediate Results are
     computed globally, then placed in Spool 10.  The size of Spool 10
     is estimated with no confidence to be 3,220,673 rows (505,645,661
     bytes).  The estimated time for this step is 8.46 seconds.
 10) We do an all-AMPs RETRIEVE step from Spool 10 (Last Use) by way of
     an all-rows scan into Spool 1 (all_amps), which is built locally
     on the AMPs.  The size of Spool 1 is estimated with no confidence
     to be 3,295,426 rows (972,150,670 bytes).  The estimated time for
     this step is 0.32 seconds.
 11) Finally, we send out an END TRANSACTION step to all AMPs involved
     in processing the request.
  -> The contents of Spool 1 are sent back to the user as the result of
     statement 1.  The total estimated time is 11.60 seconds.


Sample structures

The table structures (simplified for the example):


CREATE TABLE Customer
(
  Customer_ID   INTEGER
, Customer_name VARCHAR(200)
)
UNIQUE PRIMARY INDEX (Customer_id)
;

insert into Customer values (1,'Cust-1');
Run 20x: 

insert into Customer select mx + sum(1) over (order by Customer_id rows unbounded preceding) id, 'Cust-' || trim(id) from Customer cross join (select max(Customer_id) mx from Customer) x;

collect statistics using sample on customer column (Customer_id);

CREATE TABLE Transact
(
  Transaction_ID   INTEGER
, Customer_ID      INTEGER
)
UNIQUE PRIMARY INDEX (Transaction_id)
;

insert into Transact values (1,1);
Run 22x: 

insert into Transact select mx + sum(1) over (order by Transaction_id rows unbounded preceding) id, id mod 1000000 from Transact cross join (select max(Transaction_id) mx from Transact) x;

insert into Transact select mx + sum(1) over (order by Transaction_id rows unbounded preceding) id, 345 from Transact t cross join (select max(Transaction_id) mx from Transact) x where t.Transaction_id < 100000;

collect statistics using sample on Transact column (Customer_id);

collect statistics using sample on Transact column (Transaction_id) ;

Sponsored by PRISE Ltd.
www.prisetools.com

1 comment: