diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cd095d6..e53e023 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -592,6 +592,8 @@ // optimize skewed join by changing the query plan at compile time HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME("hive.optimize.skewjoin.compiletime", false), + HIVE_OPTIMIZE_INLINE_SKEWJOIN("hive.optimize.inline.skewjoin", false), + // Indexes HIVEOPTINDEXFILTER_COMPACT_MINSIZE("hive.optimize.index.filter.compact.minsize", (long) 5 * 1024 * 1024 * 1024), // 5G HIVEOPTINDEXFILTER_COMPACT_MAXSIZE("hive.optimize.index.filter.compact.maxsize", (long) -1), // infinity diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index cbc3cd2..f74982b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -366,6 +366,13 @@ UNSUPPORTED_SUBQUERY_EXPRESSION(10249, "Unsupported SubQuery Expression"), INVALID_SUBQUERY_EXPRESSION(10250, "Invalid SubQuery expression"), + SKEWEXPR_HAS_NO_TABLEALIAS(10247, "Failed to find alias for skew expression"), + SKEWEXPR_IS_NOT_BOOLEAN_TYPE(10248, + "Skew expression {0} has {1} type, which should be a boolean type", true), + SKEWEXPR_IS_NOT_FOUND_IN_JOIN_CONDITION(10249, + "Skew expression {0} is not found in join condition", true), + SUM_OF_SKEW_CLUSTER_SIZE_IS_TOO_BIG(10250, "Sum of skew cluster size is too big."), + SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " + "It may have crashed with an error."), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index c5fc529..12f3479 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -895,7 +895,7 @@ public String dump(int level, HashSet seenOpts) { * Initialize an array of ExprNodeEvaluator and return the result * ObjectInspectors. */ - protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals, + public static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals, ObjectInspector rowInspector) throws HiveException { ObjectInspector[] result = new ObjectInspector[evals.length]; for (int i = 0; i < evals.length; i++) { @@ -908,7 +908,7 @@ public String dump(int level, HashSet seenOpts) { * Initialize an array of ExprNodeEvaluator from start, for specified length * and return the result ObjectInspectors. */ - protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals, + public static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals, int start, int length, ObjectInspector rowInspector) throws HiveException { ObjectInspector[] result = new ObjectInspector[length]; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index c378dc7..85664d8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.SkewContext; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.SerDeException; @@ -75,6 +76,8 @@ */ protected transient ExprNodeEvaluator[] partitionEval; + protected transient SkewContext skewContext; + // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is // ready protected transient Serializer keySerializer; @@ -102,7 +105,7 @@ public void setOutputCollector(OutputCollector _out) { protected transient TopNHash reducerHash = new TopNHash(); @Override protected void initializeOp(Configuration hconf) throws HiveException { - + conf.setNumReducers(hconf.getInt("mapred.reduce.tasks", -1)); try { List keys = conf.getKeyCols(); keyEval = new ExprNodeEvaluator[keys.size()]; @@ -110,6 +113,10 @@ protected void initializeOp(Configuration hconf) throws HiveException { for (ExprNodeDesc e : keys) { keyEval[i++] = ExprNodeEvaluatorFactory.get(e); } + skewContext = conf.getSkewContext(); + if (skewContext != null) { + skewContext.initialize(); + } numDistributionKeys = conf.getNumDistributionKeys(); distinctColIndices = conf.getDistinctColumnIndices(); @@ -183,7 +190,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { // TODO: we only ever use one row of these at a time. Why do we need to cache multiple? protected transient Object[][] cachedKeys; boolean firstRow; - protected transient Random random; /** * Initializes array of ExprNodeEvaluator. Adds Union field for distinct @@ -244,6 +250,10 @@ public void processOp(Object row, int tag) throws HiveException { valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf .getOutputValueColumnNames(), rowInspector); partitionObjectInspectors = initEvaluators(partitionEval, rowInspector); + if (skewContext != null && + !skewContext.initializeKey(keyWritable, rowInspector, conf.getNumReducers())) { + skewContext = null; // disable + } int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : numDistributionKeys; cachedKeys = new Object[numKeys][keyLen]; @@ -315,6 +325,10 @@ private void populateCachedDistinctKeys(Object row, int index) throws HiveExcept } private int computeHashCode(Object row) throws HiveException { + if (skewContext != null && skewContext.evaluateSkew(row, keyWritable)) { + keyWritable.setHashCode(skewContext.isDriver(keyWritable) ? nextRandom() : -1); + return 0; + } // Evaluate the HashCode int keyHashCode = 0; if (partitionEval.length == 0) { @@ -322,10 +336,7 @@ private int computeHashCode(Object row) throws HiveException { // load balance. If the requirement is to have a single reducer, we should set // the number of reducers to 1. // Use a constant seed to make the code deterministic. - if (random == null) { - random = new Random(12345); - } - keyHashCode = random.nextInt(); + keyHashCode = nextRandom(); } else { for (int i = 0; i < partitionEval.length; i++) { Object o = partitionEval[i].evaluate(row); @@ -336,6 +347,16 @@ private int computeHashCode(Object row) throws HiveException { return keyHashCode; } + private static final int RANDOM_SEED = 12345; + private transient Random random; + + protected int nextRandom() { + if (random == null) { + random = new Random(RANDOM_SEED); + } + return random.nextInt(); + } + // Serialize the keys and append the tag protected HiveKey toHiveKey(Object obj, int tag, Integer distLength) throws SerDeException { BinaryComparable key = (BinaryComparable)keySerializer.serialize(obj, keyObjectInspector); @@ -357,11 +378,11 @@ public void collect(byte[] key, byte[] value, int hash) throws IOException { collect(keyWritable, valueWritable); } - protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException { + protected void collect(HiveKey keyWritable, Writable valueWritable) throws IOException { // Since this is a terminal operator, update counters explicitly - // forward is not called if (null != out) { - out.collect(keyWritable, valueWritable); + keyWritable.collect(out, valueWritable); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index d0be73e..a31d3e3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; import org.apache.hadoop.hive.ql.io.IOPrepareCache; +import org.apache.hadoop.hive.ql.io.SkewedKeyPartitioner; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -235,11 +236,15 @@ public int execute(DriverContext driverContext) { job.setMapOutputKeyClass(HiveKey.class); job.setMapOutputValueClass(BytesWritable.class); - try { - job.setPartitionerClass((Class) (Class.forName(HiveConf.getVar(job, - HiveConf.ConfVars.HIVEPARTITIONER)))); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e.getMessage()); + if (mWork.isUseInlineSkewContext()) { + job.setPartitionerClass(SkewedKeyPartitioner.class); + } else { + try { + job.setPartitionerClass((Class) (Class.forName(HiveConf.getVar(job, + HiveConf.ConfVars.HIVEPARTITIONER)))); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e.getMessage()); + } } if (mWork.getNumMapTasks() != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java index e60fa9e..141deb5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.io.IOException; -import java.util.Arrays; -import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,7 +33,6 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -43,8 +40,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -// import org.apache.hadoop.util.StringUtils; public class VectorReduceSinkOperator extends ReduceSinkOperator { @@ -379,10 +374,7 @@ private int computeHashCode(VectorizedRowBatch vrg, int rowIndex) throws HiveExc // load balance. If the requirement is to have a single reducer, we should set // the number of reducers to 1. // Use a constant seed to make the code deterministic. - if (random == null) { - random = new Random(12345); - } - keyHashCode = random.nextInt(); + keyHashCode = nextRandom(); } else { for (int p = 0; p < partitionEval.length; p++) { ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()]; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java index f9cf2bd..fe967f2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java @@ -20,6 +20,9 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.mapred.OutputCollector; + +import java.io.IOException; /** * HiveKey is a simple wrapper on Text which allows us to set the hashCode @@ -34,6 +37,13 @@ private transient int distKeyLength; + transient int skewGroup = -1; + transient int skewOffset = -1; + + transient boolean[] drivers; + transient int[] skewGroupSize; // size of partition for skewed group + transient int[] skewGroupStart; // starting number of partition for skewed group + public HiveKey() { hashCodeValid = false; } @@ -49,6 +59,70 @@ public void setHashCode(int myHashCode) { hashCode = myHashCode; } + public void setSkewGroup(int skewGroup) { + this.skewGroup = skewGroup; + } + + public int getSkewGroup() { + return skewGroup; + } + + public boolean initializeSkew(int numReduceTasks, boolean[] skewDrivers, int[] skewClusters) { + drivers = skewDrivers; + skewGroupSize = getSkewAssigned(numReduceTasks, skewClusters); + skewGroupStart = getSkewOffsets(numReduceTasks, skewGroupSize); + return skewGroupSize != null; + } + + public boolean isSkewed() { + return skewGroupSize != null; + } + + @SuppressWarnings("unchecked") + public void collect(OutputCollector out, Object value) throws IOException { + if (skewGroup >= 0 && !drivers[skewGroup] && skewGroupSize[skewGroup] > 1) { + // non-driving skewed key. will be propagated to all partitions in the group + for (int i = 0; i < skewGroupSize[skewGroup]; i++) { + skewOffset = i; // this is referenced by SkewedKeyPartitioner + out.collect(this, value); + } + skewOffset = -1; + } else { + out.collect(this, value); + } + } + + // calculates number of reducers for each group + // numReduceTasks : total number of reducer + // skewedSize : size of each group, >= 0 means percent, < 0 means constant number + // + // remaining reducers are assigned to non-skewed keys (skewGroup = -1) + // if remaining reducers is less than one, skew join will be disabled and back to normal join + private int[] getSkewAssigned(int numReduceTasks, int[] skewedSize) { + int remain = numReduceTasks; + int[] assigned = new int[skewedSize.length]; + for (int i = 0; i < assigned.length; i++) { + remain -= assigned[i] = skewedSize[i] < 0 ? -skewedSize[i] : + Math.max(1, (int)(numReduceTasks * (skewedSize[i] / 100.0f))); + } + return remain < 1 ? null : assigned; + } + + // calculate start offset for each group + // for non-skewed group, partition number is 0 ~ offset[0] + // for skew group n, partition number is offset[n] ~ offset[n] - 1 + private int[] getSkewOffsets(int numReduceTasks, int[] assigned) { + if (assigned == null) { + return null; + } + int remain = numReduceTasks; + int[] offsets = new int[assigned.length]; + for (int i = offsets.length - 1; i >= 0; i--) { + offsets[i] = remain -= assigned[i]; + } + return offsets; + } + @Override public int hashCode() { if (!hashCodeValid) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/SkewedKeyPartitioner.java ql/src/java/org/apache/hadoop/hive/ql/io/SkewedKeyPartitioner.java new file mode 100644 index 0000000..bce55b8 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/SkewedKeyPartitioner.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +/** + * Assigns partition number by precalculated skewGroup and skewOffset + * + * 1. skewGroup < 0 (non skewed) + * 2. skewGroup >= 0 and skewOffset >= 0 (skewed, non-driving) + * 3. skewGroup >= 0 and skewOffset < 0 (skewed, driving) + */ +public class SkewedKeyPartitioner extends DefaultHivePartitioner { + + @Override + public int getPartition(HiveKey key, V value, int numReduceTasks) { + if (!key.isSkewed()) { + // normal partitioning + return super.getPartition(key, value, numReduceTasks); + } + if (key.skewGroup < 0) { + return super.getPartition(key, value, key.skewGroupStart[0]); // case 1 + } + int groupSize = key.skewGroupSize[key.skewGroup]; + int groupStart = key.skewGroupStart[key.skewGroup]; + if (groupSize == 1) { + return groupStart; + } + if (key.skewOffset >= 0) { + return groupStart + key.skewOffset; // case 2 + } + return groupStart + super.getPartition(key, value, groupSize); // case 3 + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/InlineSkewJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/InlineSkewJoinOptimizer.java new file mode 100644 index 0000000..8f95372 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/InlineSkewJoinOptimizer.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.ql.exec.ByteWritable; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.QBJoinTree; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.SkewContext; +import org.apache.hadoop.hive.ql.udf.generic.NumericHistogram; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; + +public class InlineSkewJoinOptimizer implements Transform { + + private static final Log LOG = LogFactory.getLog(InlineSkewJoinOptimizer.class); + + public ParseContext transform(ParseContext pctx) throws SemanticException { + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", "JOIN%"), new InlineSkewJoinProcessor()); + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, new SkewOptContext(pctx)); + GraphWalker ogw = new DefaultGraphWalker(disp); + + List topNodes = new ArrayList(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + return pctx; + } + + private static class SkewOptContext implements NodeProcessorCtx { + private ParseContext parseContext; + public SkewOptContext(ParseContext pctx) { + this.parseContext = pctx; + } + } + + private static class InlineSkewJoinProcessor implements NodeProcessor { + + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + ParseContext pctx = ((SkewOptContext)procCtx).parseContext; + JoinOperator join = (JoinOperator) nd; + QBJoinTree jointTree = pctx.getJoinContext().get(join); + if (jointTree.getSkewExprs() != null && !jointTree.getSkewExprs().isEmpty()) { + return false; // explicit skew context + } + + Operator[] parents = (Operator[]) join.getParentOperators().toArray(); + + // find driver pos, use biggest one if possible + int driver = parents.length - 1; + List streams = jointTree.getStreamAliases(); + if (streams != null && !streams.isEmpty()) { + driver = JoinReorder.getBiggestPos(join, streams); + } + + ReduceSinkOperator rs = (ReduceSinkOperator) parents[driver]; + TableScanOperator ts = findSingleParent(rs, TableScanOperator.class); + if (ts == null || pctx.getTopToTable().get(ts) == null) { + return false; + } + Table table = pctx.getTopToTable().get(ts); + List sources = rs.getConf().getKeyCols(); + + // revert key expressions of RS to columns of TS + List backtrack = ExprNodeDescUtils.backtrack(sources, rs, ts); + + KeyDistribution[] skewness = getSkewness(pctx, table, backtrack); + if (skewness == null || skewness.length == 0) { + return false; + } + + ArrayList[] skewDrivers = new ArrayList[parents.length]; + ArrayList[] skewClusters = new ArrayList[parents.length]; + ArrayList[] skewKeysExprs = new ArrayList[parents.length]; + + for (KeyDistribution skew : skewness) { + if (!skew.expression.getTypeString().equals("boolean")) { + LOG.info("Boolean type expression is needed. " + + skew.expression.getExprString() + " has type of " + skew.expression.getTypeString()); + return false; + } + int skewPercent = (int) (skew.percent * 100); + for (int pos = 0; pos < parents.length; pos++) { + if (skewKeysExprs[pos] == null) { + skewDrivers[pos] = new ArrayList(); + skewClusters[pos] = new ArrayList(); + skewKeysExprs[pos] = new ArrayList(); + } + skewDrivers[pos].add(pos == driver); + skewClusters[pos].add(skewPercent); + if (pos == driver) { + skewKeysExprs[pos].add(skew.expression); + continue; + } + ReduceSinkOperator input = (ReduceSinkOperator)parents[pos]; + List targets = input.getConf().getKeyCols(); + ExprNodeDesc skewKey = ExprNodeDescUtils.replace(skew.expression, sources, targets); + if (skewKey == null) { + LOG.info("Failed to find join condition for " + skewKey); + return false; + } + skewKeysExprs[pos].add(skewKey); + } + } + + // hand over skew context to all RSs for the join + for (int pos = 0; pos < parents.length; pos++) { + SkewContext context = new SkewContext(); + context.setSkewKeys(skewKeysExprs[pos]); + context.setSkewDrivers(skewDrivers[pos]); + context.setSkewClusters(skewClusters[pos]); + ((ReduceSinkOperator) parents[pos]).getConf().setSkewContext(context); + } + return true; + } + + private KeyDistribution[] getSkewness(ParseContext pctx, Table table, + List sources) throws SemanticException { + try { + return getSkewness(Hive.get(pctx.getConf()), table, sources); + } catch (Exception e) { + LOG.info("Failed to access column statistics for skewness", e); + } + return null; + } + + private KeyDistribution[] getSkewness(Hive hive, Table table, List sources) + throws Exception { + + List result = new ArrayList(); + for (ExprNodeDesc source : sources) { + if (!(source instanceof ExprNodeColumnDesc)) { + continue; + } + ExprNodeColumnDesc exprCol = (ExprNodeColumnDesc) source; + if (!isPrimitiveNumeric(exprCol.getTypeInfo())) { + continue; + } + + PrimitiveTypeInfo ptype = (PrimitiveTypeInfo) exprCol.getTypeInfo(); + ColumnStatistics columnStats = hive.getTableColumnStatistics(table.getDbName(), + table.getTableName(), exprCol.getColumn()); + + for (ColumnStatisticsObj stats : columnStats.getStatsObj()) { + NumericHistogram histogram = getNumericHistogram(stats.getStatsData()); + if (histogram == null) { + continue; + } + double[][] published = histogram.publish(); + if (published.length < 10) { + continue; + } + long countSum = 0; + for (int i = 0; i < published.length; i++) { + countSum += (long) published[i][1]; + } + for (int i = 0; i < published.length; i++) { + if (published[i][1] * published.length > (countSum << 1)) { + float percent = (float) (published[i][1] / countSum); + ExprNodeDesc expr = null; + if (i > 0) { + List children = new ArrayList(); + children.add(exprCol.clone()); + children.add(cast(published[i - 1][0], ptype)); + expr = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, + FunctionRegistry.getFunctionInfo(">").getGenericUDF(), children); + percent += published[i - 1][1] / countSum / 2; + } + if (i < published.length - 1) { + List children = new ArrayList(); + children.add(exprCol.clone()); + children.add(cast(published[i + 1][0], ptype)); + ExprNodeDesc expr2 = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, + FunctionRegistry.getFunctionInfo("<").getGenericUDF(), children); + percent += published[i + 1][1] / countSum / 2; + expr = expr == null ? expr2 : ExprNodeDescUtils.mergePredicates(expr, expr2); + } + result.add(new KeyDistribution(expr, percent)); + } + } + } + } + return result.toArray(new KeyDistribution[result.size()]); + } + + // TODO get histogram from column stats + private NumericHistogram getNumericHistogram(ColumnStatisticsData data) { + return null; + } + } + + private static boolean isPrimitiveNumeric(TypeInfo type) { + if (type.getCategory() != ObjectInspector.Category.PRIMITIVE) { + return false; + } + switch (((PrimitiveTypeInfo) type).getPrimitiveCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + return true; + } + return false; + } + + private static ExprNodeConstantDesc cast(double value, PrimitiveTypeInfo ptype) { + switch (ptype.getPrimitiveCategory()) { + case BYTE: + return new ExprNodeConstantDesc(ptype, new ByteWritable((byte)value)); + case SHORT: + return new ExprNodeConstantDesc(ptype, new ShortWritable((short)value)); + case INT: + return new ExprNodeConstantDesc(ptype, new IntWritable((int)value)); + case LONG: + return new ExprNodeConstantDesc(ptype, new LongWritable((long)value)); + case FLOAT: + return new ExprNodeConstantDesc(ptype, new FloatWritable((float)value)); + case DOUBLE: + return new ExprNodeConstantDesc(ptype, new DoubleWritable(value)); + } + throw new IllegalStateException("Not supported category " + ptype.getPrimitiveCategory()); + } + + private static T findSingleParent(Operator start, Class target) { + if (start.getParentOperators() == null || start.getParentOperators().size() != 1) { + return null; + } + Operator parent = start.getParentOperators().get(0); + if (parent.getClass() == target) { + return (T) parent; + } + return findSingleParent(parent, target); + } + + public static class KeyDistribution { + private ExprNodeDesc expression; + private float percent; + public KeyDistribution(ExprNodeDesc expression, float percent) { + this.expression = expression; + this.percent = percent; + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java index 9238e0e..850ed50 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer; +import java.util.Collection; import java.util.HashSet; import java.util.Set; @@ -50,8 +51,8 @@ * @return The estimated size - 0 (no streamed tables), 1 (streamed tables in * subtree) or 2 (a streamed table) */ - private int getOutputSize(Operator operator, - Set bigTables) { + private static int getOutputSize(Operator operator, + Collection bigTables) { // If a join operator contains a big subtree, there is a chance that its // output is also big, so the output size is 1 (medium) if (operator instanceof JoinOperator) { @@ -114,21 +115,8 @@ private int getOutputSize(Operator operator, * Set of all big tables */ private void reorder(JoinOperator joinOp, Set bigTables) { + int biggestPos = getBiggestPos(joinOp, bigTables); int count = joinOp.getParentOperators().size(); - - // Find the biggest reduce sink - int biggestPos = count - 1; - int biggestSize = getOutputSize( - joinOp.getParentOperators().get(biggestPos), bigTables); - for (int i = 0; i < count - 1; i++) { - int currSize = getOutputSize(joinOp.getParentOperators().get(i), - bigTables); - if (currSize > biggestSize) { - biggestSize = currSize; - biggestPos = i; - } - } - // Reorder tags if need be if (biggestPos != (count - 1)) { Byte[] tagOrder = joinOp.getConf().getTagOrder(); @@ -144,6 +132,21 @@ private void reorder(JoinOperator joinOp, Set bigTables) { } } + public static int getBiggestPos(JoinOperator joinOp, Collection bigTables) { + int count = joinOp.getParentOperators().size(); + // Find the biggest reduce sink + int biggestPos = count - 1; + int biggestSize = getOutputSize(joinOp.getParentOperators().get(biggestPos), bigTables); + for (int i = 0; i < count - 1; i++) { + int currSize = getOutputSize(joinOp.getParentOperators().get(i), bigTables); + if (currSize > biggestSize) { + biggestSize = currSize; + biggestPos = i; + } + } + return biggestPos; + } + /** * Transform the query tree. For each join, check which reduce sink will * output the biggest result (based on STREAMTABLE hints) and give it the diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index f329ae8..752dfd6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -101,6 +101,9 @@ public void initialize(HiveConf hiveConf) { transformations.add(new ReduceSinkDeDuplication()); } transformations.add(new NonBlockingOpDeDupProc()); + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_INLINE_SKEWJOIN)) { + transformations.add(new InlineSkewJoinOptimizer()); + } if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVELIMITOPTENABLE)) { transformations.add(new GlobalLimitOptimizer()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g index 205604c..6ec596a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g @@ -86,7 +86,7 @@ fromClause joinSource @init { gParent.msgs.push("join source"); } @after { gParent.msgs.pop(); } - : fromSource ( joinToken^ fromSource (KW_ON! expression)? )* + : fromSource ( joinToken^ fromSource (KW_ON! expression)? (skewExpression)? )* | uniqueJoinToken^ uniqueJoinSource (COMMA! uniqueJoinSource)+ ; @@ -121,6 +121,26 @@ joinToken | KW_LEFT KW_SEMI KW_JOIN -> TOK_LEFTSEMIJOIN ; +skewExpression +@init { gParent.msgs.push("skew expression"); } +@after { gParent.msgs.pop(); } + : KW_SKEWED KW_ON LPAREN skewKeyExprs RPAREN -> ^(TOK_SKEW skewKeyExprs); + +skewKeyExprs +@init { gParent.msgs.push("skew key expression list"); } +@after { gParent.msgs.pop(); } + : + skewKeyExpr (COMMA skewKeyExpr)* -> ^(TOK_EXPLIST skewKeyExpr+) + ; + +skewKeyExpr +@init { gParent.msgs.push("skew key expression"); } +@after { gParent.msgs.pop(); } + : + expression (KW_CLUSTER KW_BY numerator=Number KW_PERCENT)? + -> ^(TOK_SKEW_EXPR expression $numerator?) + ; + lateralView @init {gParent.msgs.push("lateral view"); } @after {gParent.msgs.pop(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 2343a2c..396057b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -92,6 +92,8 @@ TOK_RIGHTOUTERJOIN; TOK_FULLOUTERJOIN; TOK_UNIQUEJOIN; TOK_CROSSJOIN; +TOK_SKEW; +TOK_SKEW_EXPR; TOK_LOAD; TOK_EXPORT; TOK_IMPORT; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java index 90abbe3..3db0ba1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java @@ -56,6 +56,12 @@ // join conditions private transient ArrayList> expressions; + // skew conditions + private transient ArrayList skewExprs; + + // skew percentage for a skew group, -1 for one partition + private transient ArrayList skewClusters; + // key index to nullsafe join flag private ArrayList nullsafes; @@ -367,4 +373,25 @@ public void addPostJoinFilter(ASTNode filter) { public List getPostJoinFilters() { return postJoinFilters; } + + public void setSkewExprs(ArrayList skewExprs) { + this.skewExprs = skewExprs; + } + + public ArrayList getSkewExprs() { + return skewExprs; + } + + public ArrayList getSkewClusters() { + return skewClusters; + } + + public void setSkewClusters(ArrayList skewClusters) { + this.skewClusters = skewClusters; + } + + public void clearSkewContexts() { + skewExprs = null; + skewClusters = null; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index e9d9ee7..17922ea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -145,6 +145,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ScriptDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.SkewContext; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.UDTFDesc; @@ -677,7 +678,7 @@ private boolean isJoinToken(ASTNode node) { @SuppressWarnings("nls") private void processJoin(QB qb, ASTNode join) throws SemanticException { int numChildren = join.getChildCount(); - if ((numChildren != 2) && (numChildren != 3) + if ((numChildren != 2) && (numChildren != 3) && (numChildren != 4) && join.getToken().getType() != HiveParser.TOK_UNIQUEJOIN) { throw new SemanticException(generateErrorMessage(join, "Join with multiple children")); @@ -6406,6 +6407,11 @@ private Operator genJoinOperator(QB qb, QBJoinTree joinTree, joinSrcOp, srcOps, omitOpts); joinContext.put(joinOp, joinTree); + ArrayList skewExprs = joinTree.getSkewExprs(); + if (skewExprs != null && !skewExprs.isEmpty()) { + handleSkews(joinTree, srcOps); + } + Operator op = joinOp; for(ASTNode condn : joinTree.getPostJoinFilters() ) { op = genFilterPlan(qb, condn, op); @@ -6413,6 +6419,91 @@ private Operator genJoinOperator(QB qb, QBJoinTree joinTree, return op; } + // using transitivity of join condition, make cluster expression for other aliases + // by replacing part of skew expression with join counterpart. + private boolean handleSkews(QBJoinTree joinTree, Operator[] srcOps) throws SemanticException { + ArrayList skewExprs = joinTree.getSkewExprs(); + + // for each group, there is one driving alias. + // keys from driving alias are randomly distributed to reducers in the group, + // but those from other aliases are copied to all reducers in the group. + ArrayList[] skewDrivers = new ArrayList[srcOps.length]; + ArrayList[] skewKeysExprs = new ArrayList[srcOps.length]; + for (int i = 0; i < skewExprs.size(); i++) { + int srcPos = candidate(skewExprs.get(i), joinTree.getBaseSrc()); + if (srcPos < 0) { + // there is no table reference, which means it's possibly a constant expression + throw new SemanticException(ErrorMsg.SKEWEXPR_HAS_NO_TABLEALIAS.getMsg( + skewExprs.get(i).toStringTree())); + } + ReduceSinkOperator driver = (ReduceSinkOperator)srcOps[srcPos]; + List sources = driver.getConf().getKeyCols(); + + RowResolver rr = opParseCtx.get(driver.getParentOperators().get(0)).getRowResolver(); + ExprNodeDesc skewKey = genExprNodeDesc(skewExprs.get(i), rr); + // skew condition should return boolean type + if (!skewKey.getTypeString().equals("boolean")) { + throw new SemanticException(ErrorMsg.SKEWEXPR_IS_NOT_BOOLEAN_TYPE.format( + skewKey.getExprString(), skewKey.getTypeString())); + } + for (int pos = 0; pos < srcOps.length; pos++) { + if (skewKeysExprs[pos] == null) { + skewKeysExprs[pos] = new ArrayList(); + skewDrivers[pos] = new ArrayList(); + } + skewDrivers[pos].add(pos == srcPos); + if (pos == srcPos) { + skewKeysExprs[pos].add(skewKey); + continue; + } + ReduceSinkOperator input = (ReduceSinkOperator)srcOps[pos]; + List targets = input.getConf().getKeyCols(); + ExprNodeDesc replaced = ExprNodeDescUtils.replace(skewKey, sources, targets); + if (replaced == null) { + throw new SemanticException(ErrorMsg.SKEWEXPR_IS_NOT_FOUND_IN_JOIN_CONDITION.format( + skewKey.getExprString())); + } + skewKeysExprs[pos].add(replaced); + } + } + // hand over skew context to all RSs for the join + for (int pos = 0; pos < srcOps.length; pos++) { + SkewContext context = new SkewContext(); + context.setSkewKeys(skewKeysExprs[pos]); + context.setSkewDrivers(skewDrivers[pos]); + context.setSkewClusters(joinTree.getSkewClusters()); + ((ReduceSinkOperator) srcOps[pos]).getConf().setSkewContext(context); + } + return true; + } + + // returns first table alias for candidate + private int candidate(ASTNode node, String[] aliases) { + int candidate = -1; + if (node.getToken().getType() == HiveParser.TOK_TABLE_OR_COL) { + String table = node.getChild(0).getText().toLowerCase(); + candidate = indexOf(unescapeIdentifier(table), aliases); + } + if (candidate < 0 && node.getChildCount() > 0) { + for (Node child : node.getChildren()) { + candidate = candidate((ASTNode)child, aliases); + if (candidate >= 0) { + return candidate; + } + } + } + return candidate; + } + + private int indexOf(String target, String[] sources) { + for (int i = 0; i < sources.length; i++) { + if (target.equals(sources[i])) { + return i; + } + } + return -1; + } + /** * Construct a selection operator for semijoin that filter out all fields * other than the group by keys. @@ -6987,8 +7078,15 @@ private QBJoinTree genJoinTree(QB qb, ASTNode joinParseTree, filtersForPushing.add(new ArrayList()); joinTree.setFiltersForPushing(filtersForPushing); + ASTNode skewCond; ASTNode joinCond = (ASTNode) joinParseTree.getChild(2); ArrayList leftSrc = new ArrayList(); + if (joinCond != null && joinCond.getToken().getType() == HiveParser.TOK_SKEW) { + skewCond = joinCond; + joinCond = null; + } else { + skewCond = (ASTNode) joinParseTree.getChild(3); + } parseJoinCondition(joinTree, joinCond, leftSrc); if (leftSrc.size() == 1) { joinTree.setLeftAlias(leftSrc.get(0)); @@ -7028,6 +7126,33 @@ private QBJoinTree genJoinTree(QB qb, ASTNode joinParseTree, parseStreamTables(joinTree, qb); } + // extract explain skews for inner join (not sure for outer join if it's possible) + if (skewCond != null && condn[0].getJoinType() == JoinType.INNER) { + ASTNode exprList = (ASTNode) skewCond.getChild(0); + assert exprList.getToken().getType() == HiveParser.TOK_EXPLIST; + ArrayList skewCluster = new ArrayList(); + ArrayList skewExpressions = new ArrayList(); + + int percentsum = 0; + for (int i = 0; i < exprList.getChildCount(); i++) { + ASTNode child = (ASTNode) exprList.getChild(i); + skewExpressions.add((ASTNode) child.getChild(0)); + if (child.getChildCount() > 1) { + String numerator = unescapeIdentifier(child.getChild(1).getText()); + int percent = Double.valueOf(numerator).intValue(); + skewCluster.add(percent); + percentsum += percent; + } else { + skewCluster.add(-1); + } + } + if (percentsum >= 100) { + throw new SemanticException(ErrorMsg.SUM_OF_SKEW_CLUSTER_SIZE_IS_TOO_BIG.getMsg()); + } + joinTree.setSkewExprs(skewExpressions); + joinTree.setSkewClusters(skewCluster); + } + return joinTree; } @@ -7201,6 +7326,10 @@ private void mergeJoins(QB qb, QBJoinTree node, QBJoinTree target, int pos, int[ } target.setMapAliases(mapAliases); } + + // not yet supports merging skew context + node.clearSkewContexts(); + target.clearSkewContexts(); } private ObjectPair findMergePos(QBJoinTree node, QBJoinTree target) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 65c4e3f..80ec488 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.BucketCol; import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol; import org.apache.hadoop.hive.ql.parse.OpParseContext; @@ -511,4 +512,27 @@ public void setVectorMode(boolean vectorMode) { this.vectorMode = vectorMode; } + public boolean isUseInlineSkewContext() { + for (Operator operator : aliasToWork.values()) { + if (isUseInlineSkewContext(operator)) { + return true; + } + } + return false; + } + + private boolean isUseInlineSkewContext(Operator operator) { + if (operator instanceof ReduceSinkOperator && + ((ReduceSinkOperator)operator).getConf().getSkewContext() != null) { + return true; + } + if (operator.getChildOperators() != null && !operator.getChildOperators().isEmpty()) { + for (Operator child : operator.getChildOperators()) { + if (isUseInlineSkewContext(child)) { + return true; + } + } + } + return false; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 5837fac..502bdf5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -71,6 +71,7 @@ private int topN = -1; private float topNMemoryUsage = -1; private boolean mapGroupBy; // for group-by, values with same key on top-K should be forwarded + private SkewContext skewContext; public ReduceSinkDesc() { } @@ -273,4 +274,13 @@ public void setDistinctColumnIndices( List> distinctColumnIndices) { this.distinctColumnIndices = distinctColumnIndices; } + + @Explain(displayName = "Explicit Skew Context") + public SkewContext getSkewContext() { + return skewContext; + } + + public void setSkewContext(SkewContext skewContext) { + this.skewContext = skewContext; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/SkewContext.java ql/src/java/org/apache/hadoop/hive/ql/plan/SkewContext.java new file mode 100644 index 0000000..dc892a4 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SkewContext.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * Context for skewed RS + */ +public class SkewContext implements Serializable { + + private static final long serialVersionUID = 1L; + + private ArrayList skewKeys; + private ArrayList skewClusters; + private ArrayList skewDrivers; + + private transient ExprNodeEvaluator[] keyEvals; + private transient ObjectInspector[] keyObjectInspectors; + + private transient boolean[] drivers; + private transient int[] clusters; + + public void initialize() throws HiveException { + int i = 0; + keyEvals = new ExprNodeEvaluator[skewKeys.size()]; + for (ExprNodeDesc e : skewKeys) { + keyEvals[i++] = ExprNodeEvaluatorFactory.get(e); + } + drivers = getBooleanArray(skewDrivers); + clusters = getIntegerArray(skewClusters); + } + + public boolean initializeKey(HiveKey keyWritable, + ObjectInspector rowInspector, int numReducer) throws HiveException { + boolean runSkew = keyWritable.initializeSkew(numReducer, drivers, clusters); + if (runSkew) { + keyObjectInspectors = Operator.initEvaluators(keyEvals, rowInspector); + } + return runSkew; + } + + private boolean[] getBooleanArray(List booleans) { + boolean[] array = new boolean[booleans.size()]; + for ( int i = 0 ; i < array.length; i++) { + array[i] = booleans.get(i); + } + return array; + } + + private int[] getIntegerArray(List integers) { + int[] array = new int[integers.size()]; + for ( int i = 0 ; i < array.length; i++) { + array[i] = integers.get(i); + } + return array; + } + + @Explain(displayName = "key expressions") + public ArrayList getSkewKeys() { + return skewKeys; + } + + public void setSkewKeys(ArrayList skewKeys) { + this.skewKeys = skewKeys; + } + + @Explain(displayName = "cluster sizes") + public ArrayList getSkewClusters() { + return skewClusters; + } + + public void setSkewClusters(ArrayList skewClusters) { + this.skewClusters = skewClusters; + } + + @Explain(displayName = "drivers", normalExplain = false) + public ArrayList getSkewDrivers() { + return skewDrivers; + } + + public void setSkewDrivers(ArrayList skewDrivers) { + this.skewDrivers = skewDrivers; + } + + // Evaluate skew group and hashcode for the row + // If it's non skewed, return false for normal hashing + public boolean evaluateSkew(Object row, HiveKey hiveKey) throws HiveException { + for (int i = 0; i < keyEvals.length; i++) { + Object eval = keyEvals[i].evaluate(row); + if (eval != null && ((BooleanObjectInspector) keyObjectInspectors[i]).get(eval)) { + hiveKey.setSkewGroup(i); + return true; + } + } + hiveKey.setSkewGroup(-1); + return false; + } + + public boolean isDriver(HiveKey hiveKey) { + return drivers[hiveKey.getSkewGroup()]; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/NumericHistogram.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/NumericHistogram.java index a8c875c..8059db4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/NumericHistogram.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/NumericHistogram.java @@ -317,4 +317,12 @@ public double quantile(double q) { public int getNumBins() { return bins == null ? 0 : bins.size(); } + + public double[][] publish() { + double[][] published = new double[nusedbins][]; + for (int i = 0; i < published.length; i++) { + published[i] = new double[] {bins.get(i).x, bins.get(i).y}; + } + return published; + } } diff --git ql/src/test/queries/clientnegative/skewjoin_explicit_invalid1.q ql/src/test/queries/clientnegative/skewjoin_explicit_invalid1.q new file mode 100644 index 0000000..c1c52d8 --- /dev/null +++ ql/src/test/queries/clientnegative/skewjoin_explicit_invalid1.q @@ -0,0 +1,5 @@ +-- negative, constant expression +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + 0 = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT); diff --git ql/src/test/queries/clientnegative/skewjoin_explicit_invalid2.q ql/src/test/queries/clientnegative/skewjoin_explicit_invalid2.q new file mode 100644 index 0000000..f5e1399 --- /dev/null +++ ql/src/test/queries/clientnegative/skewjoin_explicit_invalid2.q @@ -0,0 +1,5 @@ +-- negative, skew expression is not containing join condition +explain select count(*) from src a JOIN src b ON a.key+1=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT); diff --git ql/src/test/queries/clientnegative/skewjoin_explicit_invalid3.q ql/src/test/queries/clientnegative/skewjoin_explicit_invalid3.q new file mode 100644 index 0000000..a7e6850 --- /dev/null +++ ql/src/test/queries/clientnegative/skewjoin_explicit_invalid3.q @@ -0,0 +1,3 @@ +-- negative, skew expression should be a boolean type +explain select count(*) from src a JOIN src b ON a.key+1=b.key SKEWED ON ( + a.key + 100 CLUSTER BY 20 PERCENT); diff --git ql/src/test/queries/clientnegative/skewjoin_explicit_invalid4.q ql/src/test/queries/clientnegative/skewjoin_explicit_invalid4.q new file mode 100644 index 0000000..81a2a1d --- /dev/null +++ ql/src/test/queries/clientnegative/skewjoin_explicit_invalid4.q @@ -0,0 +1,5 @@ +-- negative, sum of cluster size exceeds 99 percent +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 40 PERCENT, + b.key = 100 CLUSTER BY 40 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT); diff --git ql/src/test/queries/clientpositive/skewjoin_explicit.q ql/src/test/queries/clientpositive/skewjoin_explicit.q new file mode 100644 index 0000000..7b38e12 --- /dev/null +++ ql/src/test/queries/clientpositive/skewjoin_explicit.q @@ -0,0 +1,107 @@ +select count(*) from src a JOIN src b ON a.key=b.key; + +set hive.exec.reducers.bytes.per.reducer=600; + +-- simple (one partition for each cluster) +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0, b.key = 100, CAST(a.key as int) > 300); + +select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0, b.key = 100, CAST(a.key as int) > 300); + +-- percent based assigning +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT); + +select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT); + +-- subquery +explain select count(*) from (select distinct key from src) a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT); + +select count(*) from (select distinct key from src) a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT); + +-- nested skewjoin +explain +select * from + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 10 CLUSTER BY 20 PERCENT, + b.key < 20 CLUSTER BY 20 PERCENT) where a.key < 70) X + JOIN + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 60 CLUSTER BY 20 PERCENT, + b.key < 70 CLUSTER BY 20 PERCENT) where a.key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT); + +select * from + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 10 CLUSTER BY 20 PERCENT, + b.key < 20 CLUSTER BY 20 PERCENT) where a.key < 70) X + JOIN + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 60 CLUSTER BY 20 PERCENT, + b.key < 70 CLUSTER BY 20 PERCENT) where a.key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT); + +set hive.auto.convert.join = true; +set hive.auto.convert.join.noconditionaltask = false; + +-- auto-convert (map-join) +explain +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key; + +-- auto-convert (map-join) +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key; + +set hive.mapjoin.smalltable.filesize=100; + +-- auto-convert (common-join) +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key; + +reset; +set hive.exec.reducers.bytes.per.reducer=3000; + +-- negative, 3 reducers for 5 groups (4 skewed +1 non-skewed), disabled at runtime +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 10 PERCENT, + a.key = 10 CLUSTER BY 10 PERCENT, + a.key = 20 CLUSTER BY 10 PERCENT, + b.key = 30 CLUSTER BY 10 PERCENT); + +select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 10 PERCENT, + a.key = 10 CLUSTER BY 10 PERCENT, + a.key = 20 CLUSTER BY 10 PERCENT, + b.key = 30 CLUSTER BY 10 PERCENT); diff --git ql/src/test/results/clientnegative/skewjoin_explicit_invalid1.q.out ql/src/test/results/clientnegative/skewjoin_explicit_invalid1.q.out new file mode 100644 index 0000000..2288a96 --- /dev/null +++ ql/src/test/results/clientnegative/skewjoin_explicit_invalid1.q.out @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10247]: Failed to find alias for skew expression (= 0 0) diff --git ql/src/test/results/clientnegative/skewjoin_explicit_invalid2.q.out ql/src/test/results/clientnegative/skewjoin_explicit_invalid2.q.out new file mode 100644 index 0000000..2a3bdcd --- /dev/null +++ ql/src/test/results/clientnegative/skewjoin_explicit_invalid2.q.out @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10249]: Skew expression (key = 0) is not found in join condition diff --git ql/src/test/results/clientnegative/skewjoin_explicit_invalid3.q.out ql/src/test/results/clientnegative/skewjoin_explicit_invalid3.q.out new file mode 100644 index 0000000..d25ecf5 --- /dev/null +++ ql/src/test/results/clientnegative/skewjoin_explicit_invalid3.q.out @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10248]: Skew expression (key + 100) has double type, which should be a boolean type diff --git ql/src/test/results/clientnegative/skewjoin_explicit_invalid4.q.out ql/src/test/results/clientnegative/skewjoin_explicit_invalid4.q.out new file mode 100644 index 0000000..d7fc87b --- /dev/null +++ ql/src/test/results/clientnegative/skewjoin_explicit_invalid4.q.out @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10250]: Sum of skew cluster size is too big. diff --git ql/src/test/results/clientpositive/skewjoin_explicit.q.out ql/src/test/results/clientpositive/skewjoin_explicit.q.out new file mode 100644 index 0000000..b6cebff --- /dev/null +++ ql/src/test/results/clientpositive/skewjoin_explicit.q.out @@ -0,0 +1,1366 @@ +PREHOOK: query: select count(*) from src a JOIN src b ON a.key=b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from src a JOIN src b ON a.key=b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1028 +PREHOOK: query: -- simple (one partition for each cluster) +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0, b.key = 100, CAST(a.key as int) > 300) +PREHOOK: type: QUERY +POSTHOOK: query: -- simple (one partition for each cluster) +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0, b.key = 100, CAST(a.key as int) > 300) +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME src) a) (TOK_TABREF (TOK_TABNAME src) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (TOK_SKEW (TOK_EXPLIST (TOK_SKEW_EXPR (= (. (TOK_TABLE_OR_COL a) key) 0)) (TOK_SKEW_EXPR (= (. (TOK_TABLE_OR_COL b) key) 100)) (TOK_SKEW_EXPR (> (TOK_FUNCTION TOK_INT (. (TOK_TABLE_OR_COL a) key)) 300)))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTIONSTAR count))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + Explicit Skew Context: + cluster sizes: -1, -1, -1 + key expressions: + expr: (key = 0) + type: boolean + expr: (key = 100) + type: boolean + expr: (UDFToInteger(key) > 300) + type: boolean + tag: 0 + b + TableScan + alias: b + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + Explicit Skew Context: + cluster sizes: -1, -1, -1 + key expressions: + expr: (key = 0) + type: boolean + expr: (key = 100) + type: boolean + expr: (UDFToInteger(key) > 300) + type: boolean + tag: 1 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + handleSkewJoin: false + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0, b.key = 100, CAST(a.key as int) > 300) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0, b.key = 100, CAST(a.key as int) > 300) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1028 +PREHOOK: query: -- percent based assigning +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +PREHOOK: type: QUERY +POSTHOOK: query: -- percent based assigning +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME src) a) (TOK_TABREF (TOK_TABNAME src) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (TOK_SKEW (TOK_EXPLIST (TOK_SKEW_EXPR (= (. (TOK_TABLE_OR_COL a) key) 0) 20) (TOK_SKEW_EXPR (= (. (TOK_TABLE_OR_COL b) key) 100) 20) (TOK_SKEW_EXPR (> (TOK_FUNCTION TOK_INT (. (TOK_TABLE_OR_COL a) key)) 300) 40))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTIONSTAR count))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + Explicit Skew Context: + cluster sizes: 20, 20, 40 + key expressions: + expr: (key = 0) + type: boolean + expr: (key = 100) + type: boolean + expr: (UDFToInteger(key) > 300) + type: boolean + tag: 0 + b + TableScan + alias: b + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + Explicit Skew Context: + cluster sizes: 20, 20, 40 + key expressions: + expr: (key = 0) + type: boolean + expr: (key = 100) + type: boolean + expr: (UDFToInteger(key) > 300) + type: boolean + tag: 1 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + handleSkewJoin: false + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1028 +PREHOOK: query: -- subquery +explain select count(*) from (select distinct key from src) a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +PREHOOK: type: QUERY +POSTHOOK: query: -- subquery +explain select count(*) from (select distinct key from src) a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECTDI (TOK_SELEXPR (TOK_TABLE_OR_COL key))))) a) (TOK_TABREF (TOK_TABNAME src) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (TOK_SKEW (TOK_EXPLIST (TOK_SKEW_EXPR (= (. (TOK_TABLE_OR_COL a) key) 0) 20) (TOK_SKEW_EXPR (= (. (TOK_TABLE_OR_COL b) key) 100) 20) (TOK_SKEW_EXPR (> (TOK_FUNCTION TOK_INT (. (TOK_TABLE_OR_COL a) key)) 300) 40))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTIONSTAR count))))) + +STAGE DEPENDENCIES: + Stage-3 is a root stage + Stage-1 depends on stages: Stage-3 + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-3 + Map Reduce + Alias -> Map Operator Tree: + a:src + TableScan + alias: src + Select Operator + expressions: + expr: key + type: string + outputColumnNames: key + Group By Operator + bucketGroup: false + keys: + expr: key + type: string + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + Map-reduce partition columns: + expr: _col0 + type: string + tag: -1 + Reduce Operator Tree: + Group By Operator + bucketGroup: false + keys: + expr: KEY._col0 + type: string + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: string + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + $INTNAME + TableScan + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + Map-reduce partition columns: + expr: _col0 + type: string + Explicit Skew Context: + cluster sizes: 20, 20, 40 + key expressions: + expr: (_col0 = 0) + type: boolean + expr: (_col0 = 100) + type: boolean + expr: (UDFToInteger(_col0) > 300) + type: boolean + tag: 0 + b + TableScan + alias: b + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + Explicit Skew Context: + cluster sizes: 20, 20, 40 + key expressions: + expr: (key = 0) + type: boolean + expr: (key = 100) + type: boolean + expr: (UDFToInteger(key) > 300) + type: boolean + tag: 1 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + handleSkewJoin: false + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select count(*) from (select distinct key from src) a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from (select distinct key from src) a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +500 +PREHOOK: query: -- nested skewjoin +explain +select * from + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 10 CLUSTER BY 20 PERCENT, + b.key < 20 CLUSTER BY 20 PERCENT) where a.key < 70) X + JOIN + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 60 CLUSTER BY 20 PERCENT, + b.key < 70 CLUSTER BY 20 PERCENT) where a.key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) +PREHOOK: type: QUERY +POSTHOOK: query: -- nested skewjoin +explain +select * from + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 10 CLUSTER BY 20 PERCENT, + b.key < 20 CLUSTER BY 20 PERCENT) where a.key < 70) X + JOIN + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 60 CLUSTER BY 20 PERCENT, + b.key < 70 CLUSTER BY 20 PERCENT) where a.key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME src) a) (TOK_TABREF (TOK_TABNAME src) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (TOK_SKEW (TOK_EXPLIST (TOK_SKEW_EXPR (< (. (TOK_TABLE_OR_COL a) key) 10) 20) (TOK_SKEW_EXPR (< (. (TOK_TABLE_OR_COL b) key) 20) 20))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF (TOK_TABNAME a)))) (TOK_WHERE (< (. (TOK_TABLE_OR_COL a) key) 70)))) X) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME src) a) (TOK_TABREF (TOK_TABNAME src) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (TOK_SKEW (TOK_EXPLIST (TOK_SKEW_EXPR (< (. (TOK_TABLE_OR_COL a) key) 60) 20) (TOK_SKEW_EXPR (< (. (TOK_TABLE_OR_COL b) key) 70) 20))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF (TOK_TABNAME a)))) (TOK_WHERE (> (. (TOK_TABLE_OR_COL a) key) 50)))) Y) (= (. (TOK_TABLE_OR_COL X) key) (. (TOK_TABLE_OR_COL Y) key)) (TOK_SKEW (TOK_EXPLIST (TOK_SKEW_EXPR (< (. (TOK_TABLE_OR_COL X) key) 55) 20) (TOK_SKEW_EXPR (> (. (TOK_TABLE_OR_COL Y) key) 65) 20))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1, Stage-4 + Stage-4 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + x:a + TableScan + alias: a + Filter Operator + predicate: + expr: (key < 70) + type: boolean + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + expr: (key < 10) + type: boolean + expr: (key < 20) + type: boolean + tag: 0 + value expressions: + expr: key + type: string + expr: value + type: string + x:b + TableScan + alias: b + Filter Operator + predicate: + expr: (key < 70) + type: boolean + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + expr: (key < 10) + type: boolean + expr: (key < 20) + type: boolean + tag: 1 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 + handleSkewJoin: false + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + $INTNAME + TableScan + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + Map-reduce partition columns: + expr: _col0 + type: string + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + expr: (_col0 < 55) + type: boolean + expr: (_col0 > 65) + type: boolean + tag: 0 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + $INTNAME1 + TableScan + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + Map-reduce partition columns: + expr: _col0 + type: string + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + expr: (_col0 < 55) + type: boolean + expr: (_col0 > 65) + type: boolean + tag: 1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 {VALUE._col0} {VALUE._col1} + handleSkewJoin: false + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + expr: _col3 + type: string + outputColumnNames: _col0, _col1, _col2, _col3 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-4 + Map Reduce + Alias -> Map Operator Tree: + y:a + TableScan + alias: a + Filter Operator + predicate: + expr: (key > 50) + type: boolean + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + expr: (key < 60) + type: boolean + expr: (key < 70) + type: boolean + tag: 0 + value expressions: + expr: key + type: string + expr: value + type: string + y:b + TableScan + alias: b + Filter Operator + predicate: + expr: (key > 50) + type: boolean + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + expr: (key < 60) + type: boolean + expr: (key < 70) + type: boolean + tag: 1 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 + handleSkewJoin: false + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select * from + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 10 CLUSTER BY 20 PERCENT, + b.key < 20 CLUSTER BY 20 PERCENT) where a.key < 70) X + JOIN + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 60 CLUSTER BY 20 PERCENT, + b.key < 70 CLUSTER BY 20 PERCENT) where a.key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select * from + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 10 CLUSTER BY 20 PERCENT, + b.key < 20 CLUSTER BY 20 PERCENT) where a.key < 70) X + JOIN + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 60 CLUSTER BY 20 PERCENT, + b.key < 70 CLUSTER BY 20 PERCENT) where a.key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +53 val_53 53 val_53 +54 val_54 54 val_54 +57 val_57 57 val_57 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +64 val_64 64 val_64 +65 val_65 65 val_65 +66 val_66 66 val_66 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +69 val_69 69 val_69 +PREHOOK: query: -- auto-convert (map-join) +explain +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key +PREHOOK: type: QUERY +POSTHOOK: query: -- auto-convert (map-join) +explain +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (< (TOK_TABLE_OR_COL key) 70)))) X) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (> (TOK_TABLE_OR_COL key) 50)))) Y) (= (. (TOK_TABLE_OR_COL X) key) (. (TOK_TABLE_OR_COL Y) key)) (TOK_SKEW (TOK_EXPLIST (TOK_SKEW_EXPR (< (. (TOK_TABLE_OR_COL X) key) 55) 20) (TOK_SKEW_EXPR (> (. (TOK_TABLE_OR_COL Y) key) 65) 20))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF (TOK_TABNAME X)))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (. (TOK_TABLE_OR_COL X) key))))) + +STAGE DEPENDENCIES: + Stage-6 is a root stage , consists of Stage-7, Stage-8, Stage-1 + Stage-7 has a backup stage: Stage-1 + Stage-4 depends on stages: Stage-7 + Stage-2 depends on stages: Stage-1, Stage-4, Stage-5 + Stage-8 has a backup stage: Stage-1 + Stage-5 depends on stages: Stage-8 + Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-6 + Conditional Operator + + Stage: Stage-7 + Map Reduce Local Work + Alias -> Map Local Tables: + y:src + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + y:src + TableScan + alias: src + Filter Operator + predicate: + expr: (key > 50) + type: boolean + Select Operator + expressions: + expr: key + type: string + outputColumnNames: _col0 + HashTable Sink Operator + condition expressions: + 0 {_col0} {_col1} + 1 + handleSkewJoin: false + keys: + 0 [Column[_col0]] + 1 [Column[_col0]] + Position of Big Table: 0 + + Stage: Stage-4 + Map Reduce + Alias -> Map Operator Tree: + x:src + TableScan + alias: src + Filter Operator + predicate: + expr: (key < 70) + type: boolean + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {_col0} {_col1} + 1 + handleSkewJoin: false + keys: + 0 [Column[_col0]] + 1 [Column[_col0]] + outputColumnNames: _col0, _col1 + Position of Big Table: 0 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + tag: -1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + Reduce Operator Tree: + Extract + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-8 + Map Reduce Local Work + Alias -> Map Local Tables: + x:src + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + x:src + TableScan + alias: src + Filter Operator + predicate: + expr: (key < 70) + type: boolean + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + HashTable Sink Operator + condition expressions: + 0 {_col0} {_col1} + 1 + handleSkewJoin: false + keys: + 0 [Column[_col0]] + 1 [Column[_col0]] + Position of Big Table: 1 + + Stage: Stage-5 + Map Reduce + Alias -> Map Operator Tree: + y:src + TableScan + alias: src + Filter Operator + predicate: + expr: (key > 50) + type: boolean + Select Operator + expressions: + expr: key + type: string + outputColumnNames: _col0 + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {_col0} {_col1} + 1 + handleSkewJoin: false + keys: + 0 [Column[_col0]] + 1 [Column[_col0]] + outputColumnNames: _col0, _col1 + Position of Big Table: 1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + x:src + TableScan + alias: src + Filter Operator + predicate: + expr: (key < 70) + type: boolean + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + Map-reduce partition columns: + expr: _col0 + type: string + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + expr: (_col0 < 55) + type: boolean + expr: (_col0 > 65) + type: boolean + tag: 0 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + y:src + TableScan + alias: src + Filter Operator + predicate: + expr: (key > 50) + type: boolean + Select Operator + expressions: + expr: key + type: string + outputColumnNames: _col0 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + Map-reduce partition columns: + expr: _col0 + type: string + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + expr: (_col0 < 55) + type: boolean + expr: (_col0 > 65) + type: boolean + tag: 1 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 + handleSkewJoin: false + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: -- auto-convert (map-join) +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: -- auto-convert (map-join) +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +51 val_51 +51 val_51 +51 val_51 +51 val_51 +53 val_53 +54 val_54 +57 val_57 +58 val_58 +58 val_58 +58 val_58 +58 val_58 +64 val_64 +65 val_65 +66 val_66 +67 val_67 +67 val_67 +67 val_67 +67 val_67 +69 val_69 +PREHOOK: query: -- auto-convert (common-join) +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: -- auto-convert (common-join) +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +51 val_51 +51 val_51 +51 val_51 +51 val_51 +53 val_53 +54 val_54 +57 val_57 +58 val_58 +58 val_58 +58 val_58 +58 val_58 +64 val_64 +65 val_65 +66 val_66 +67 val_67 +67 val_67 +67 val_67 +67 val_67 +69 val_69 +PREHOOK: query: -- negative, 3 reducers for 5 groups (4 skewed +1 non-skewed), disabled at runtime +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 10 PERCENT, + a.key = 10 CLUSTER BY 10 PERCENT, + a.key = 20 CLUSTER BY 10 PERCENT, + b.key = 30 CLUSTER BY 10 PERCENT) +PREHOOK: type: QUERY +POSTHOOK: query: -- negative, 3 reducers for 5 groups (4 skewed +1 non-skewed), disabled at runtime +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 10 PERCENT, + a.key = 10 CLUSTER BY 10 PERCENT, + a.key = 20 CLUSTER BY 10 PERCENT, + b.key = 30 CLUSTER BY 10 PERCENT) +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME src) a) (TOK_TABREF (TOK_TABNAME src) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (TOK_SKEW (TOK_EXPLIST (TOK_SKEW_EXPR (= (. (TOK_TABLE_OR_COL a) key) 0) 10) (TOK_SKEW_EXPR (= (. (TOK_TABLE_OR_COL a) key) 10) 10) (TOK_SKEW_EXPR (= (. (TOK_TABLE_OR_COL a) key) 20) 10) (TOK_SKEW_EXPR (= (. (TOK_TABLE_OR_COL b) key) 30) 10))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTIONSTAR count))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + Explicit Skew Context: + cluster sizes: 10, 10, 10, 10 + key expressions: + expr: (key = 0) + type: boolean + expr: (key = 10) + type: boolean + expr: (key = 20) + type: boolean + expr: (key = 30) + type: boolean + tag: 0 + b + TableScan + alias: b + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + Explicit Skew Context: + cluster sizes: 10, 10, 10, 10 + key expressions: + expr: (key = 0) + type: boolean + expr: (key = 10) + type: boolean + expr: (key = 20) + type: boolean + expr: (key = 30) + type: boolean + tag: 1 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + handleSkewJoin: false + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 10 PERCENT, + a.key = 10 CLUSTER BY 10 PERCENT, + a.key = 20 CLUSTER BY 10 PERCENT, + b.key = 30 CLUSTER BY 10 PERCENT) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 10 PERCENT, + a.key = 10 CLUSTER BY 10 PERCENT, + a.key = 20 CLUSTER BY 10 PERCENT, + b.key = 30 CLUSTER BY 10 PERCENT) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1028