diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java index e8c7486..64dc48c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java @@ -28,8 +28,6 @@ import java.util.Set; import java.util.Stack; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -49,12 +47,12 @@ import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -70,6 +68,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * SkewJoinOptimizer. @@ -283,10 +283,11 @@ private boolean getTableScanOps( * @param op The join operator being optimized * @param tableScanOpsForJoin table scan operators which are parents of the join operator * @return map. + * @throws SemanticException */ private Map, List>> getSkewedValues( - Operator op, List tableScanOpsForJoin) { + Operator op, List tableScanOpsForJoin) throws SemanticException { Map , List>> skewDataReturn = new HashMap, List>>(); @@ -299,6 +300,7 @@ private boolean getTableScanOps( ReduceSinkDesc rsDesc = ((ReduceSinkOperator) reduceSinkOp).getConf(); if (rsDesc.getKeyCols() != null) { + TableScanOperator tableScanOp = null; Table table = null; // Find the skew information corresponding to the table List skewedColumns = null; @@ -321,7 +323,9 @@ private boolean getTableScanOps( if (keyColDesc instanceof ExprNodeColumnDesc) { keyCol = (ExprNodeColumnDesc) keyColDesc; if (table == null) { - table = getTable(parseContext, reduceSinkOp, tableScanOpsForJoin); + tableScanOp = getTableScanOperator(parseContext, reduceSinkOp, tableScanOpsForJoin); + table = + tableScanOp == null ? null : tableScanOp.getConf().getTableMetadata(); skewedColumns = table == null ? null : table.getSkewedColNames(); // No skew on the table to take care of @@ -332,10 +336,13 @@ private boolean getTableScanOps( skewedValueList = table == null ? null : table.getSkewedColValues(); } - int pos = skewedColumns.indexOf(keyCol.getColumn()); + ExprNodeDesc keyColOrigin = ExprNodeDescUtils.backtrack(keyCol, + reduceSinkOp, tableScanOp); + int pos = keyColOrigin == null || !(keyColOrigin instanceof ExprNodeColumnDesc) ? + -1 : skewedColumns.indexOf(((ExprNodeColumnDesc)keyColOrigin).getColumn()); if ((pos >= 0) && (!positionSkewedKeys.contains(pos))) { positionSkewedKeys.add(pos); - ExprNodeColumnDesc keyColClone = (ExprNodeColumnDesc) keyCol.clone(); + ExprNodeColumnDesc keyColClone = (ExprNodeColumnDesc) keyColOrigin.clone(); keyColClone.setTabAlias(null); joinKeysSkewedCols.add(new ExprNodeDescEqualityWrapper(keyColClone)); } @@ -386,9 +393,9 @@ private boolean getTableScanOps( } /** - * Get the table alias from the candidate table scans. + * Get the table scan. */ - private Table getTable( + private TableScanOperator getTableScanOperator( ParseContext parseContext, Operator op, List tableScanOpsForJoin) { @@ -396,7 +403,7 @@ private Table getTable( if (op instanceof TableScanOperator) { TableScanOperator tsOp = (TableScanOperator)op; if (tableScanOpsForJoin.contains(tsOp)) { - return tsOp.getConf().getTableMetadata(); + return tsOp; } } if ((op.getParentOperators() == null) || (op.getParentOperators().isEmpty()) || diff --git ql/src/test/queries/clientpositive/skewjoinopt21.q ql/src/test/queries/clientpositive/skewjoinopt21.q new file mode 100644 index 0000000..76dde57 --- /dev/null +++ ql/src/test/queries/clientpositive/skewjoinopt21.q @@ -0,0 +1,30 @@ +set hive.optimize.skewjoin.compiletime = true; + +CREATE TABLE T1(key STRING, val STRING) +SKEWED BY (key) ON ((2)) STORED AS TEXTFILE; + +LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1; + +CREATE TABLE T2(key STRING, val STRING) +SKEWED BY (key) ON ((3)) STORED AS TEXTFILE; + +LOAD DATA LOCAL INPATH '../../data/files/T2.txt' INTO TABLE T2; + +-- a simple join query with skew on both the tables on the join key +-- adding a order by at the end to make the results deterministic + +EXPLAIN +SELECT a.*, b.* +FROM + (SELECT key as k, val as v FROM T1) a + JOIN + (SELECT key as k, val as v FROM T2) b +ON a.k = b.k; + +SELECT a.*, b.* +FROM + (SELECT key as k, val as v FROM T1) a + JOIN + (SELECT key as k, val as v FROM T2) b +ON a.k = b.k +ORDER BY a.k, b.k, a.v, b.v; diff --git ql/src/test/results/clientpositive/skewjoinopt21.q.out ql/src/test/results/clientpositive/skewjoinopt21.q.out new file mode 100644 index 0000000..d58d694 --- /dev/null +++ ql/src/test/results/clientpositive/skewjoinopt21.q.out @@ -0,0 +1,230 @@ +PREHOOK: query: CREATE TABLE T1(key STRING, val STRING) +SKEWED BY (key) ON ((2)) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@T1 +POSTHOOK: query: CREATE TABLE T1(key STRING, val STRING) +SKEWED BY (key) ON ((2)) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@T1 +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1 +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@t1 +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1 +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@t1 +PREHOOK: query: CREATE TABLE T2(key STRING, val STRING) +SKEWED BY (key) ON ((3)) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@T2 +POSTHOOK: query: CREATE TABLE T2(key STRING, val STRING) +SKEWED BY (key) ON ((3)) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@T2 +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/T2.txt' INTO TABLE T2 +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@t2 +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/T2.txt' INTO TABLE T2 +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@t2 +PREHOOK: query: -- a simple join query with skew on both the tables on the join key +-- adding a order by at the end to make the results deterministic + +EXPLAIN +SELECT a.*, b.* +FROM + (SELECT key as k, val as v FROM T1) a + JOIN + (SELECT key as k, val as v FROM T2) b +ON a.k = b.k +PREHOOK: type: QUERY +POSTHOOK: query: -- a simple join query with skew on both the tables on the join key +-- adding a order by at the end to make the results deterministic + +EXPLAIN +SELECT a.*, b.* +FROM + (SELECT key as k, val as v FROM T1) a + JOIN + (SELECT key as k, val as v FROM T2) b +ON a.k = b.k +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1, Stage-4 + Stage-4 is a root stage + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and ((key = '2') or (key = '3'))) (type: boolean) + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), val (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + TableScan + alias: t2 + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and ((key = '2') or (key = '3'))) (type: boolean) + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), val (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 33 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 33 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Union + Statistics: Num rows: 2 Data size: 66 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 66 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TableScan + Union + Statistics: Num rows: 2 Data size: 66 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 66 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-4 + Map Reduce + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and (not ((key = '2') or (key = '3')))) (type: boolean) + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), val (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + TableScan + alias: t2 + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and (not ((key = '2') or (key = '3')))) (type: boolean) + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), val (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 33 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 33 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT a.*, b.* +FROM + (SELECT key as k, val as v FROM T1) a + JOIN + (SELECT key as k, val as v FROM T2) b +ON a.k = b.k +ORDER BY a.k, b.k, a.v, b.v +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +PREHOOK: Input: default@t2 +#### A masked pattern was here #### +POSTHOOK: query: SELECT a.*, b.* +FROM + (SELECT key as k, val as v FROM T1) a + JOIN + (SELECT key as k, val as v FROM T2) b +ON a.k = b.k +ORDER BY a.k, b.k, a.v, b.v +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +POSTHOOK: Input: default@t2 +#### A masked pattern was here #### +2 12 2 22 +3 13 3 13 +8 18 8 18 +8 18 8 18 +8 28 8 18 +8 28 8 18