diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5f9d3ca..f4d9e83 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1091,6 +1091,8 @@ "If the skew information is correctly stored in the metadata, hive.optimize.skewjoin.compiletime\n" + "would change the query plan to take care of it, and hive.optimize.skewjoin will be a no-op."), + HIVE_OPTIMIZE_INLINE_SKEWJOIN("hive.optimize.inline.skewjoin", false, "Enable inline skew join."), + // Indexes HIVEOPTINDEXFILTER_COMPACT_MINSIZE("hive.optimize.index.filter.compact.minsize", (long) 5 * 1024 * 1024 * 1024, "Minimum size (in bytes) of the inputs on which a compact index is automatically used."), // 5G diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 31978fe..4733797 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -422,6 +422,13 @@ ACID_NO_SORTED_BUCKETS(10298, "ACID insert, update, delete not supported on tables that are " + "sorted, table {0}", true), + SKEWEXPR_HAS_NO_TABLEALIAS(10298, "Failed to find alias for skew expression"), + SKEWEXPR_IS_NOT_BOOLEAN_TYPE(10299, + "Skew expression {0} has {1} type, which should be a boolean type", true), + SKEWEXPR_IS_NOT_FOUND_IN_JOIN_CONDITION(10300, + "Skew expression {0} is not found in join condition", true), + SUM_OF_SKEW_CLUSTER_SIZE_IS_TOO_BIG(10301, "Sum of skew cluster size is too big."), + //========================== 20000 range starts here ========================// SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 3dc7c76..222c740 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -931,7 +931,7 @@ public String dump(int level, HashSet seenOpts) { * Initialize an array of ExprNodeEvaluator and return the result * ObjectInspectors. */ - protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals, + public static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals, ObjectInspector rowInspector) throws HiveException { ObjectInspector[] result = new ObjectInspector[evals.length]; for (int i = 0; i < evals.length; i++) { @@ -944,7 +944,7 @@ public String dump(int level, HashSet seenOpts) { * Initialize an array of ExprNodeEvaluator from start, for specified length * and return the result ObjectInspectors. */ - protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals, + public static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals, int start, int length, ObjectInspector rowInspector) throws HiveException { ObjectInspector[] result = new ObjectInspector[length]; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index d8698da..0fe1f4d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.SkewContext; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.SerDeException; @@ -66,11 +67,14 @@ } private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName()); + private static final boolean isInfoEnabled = LOG.isInfoEnabled(); private static final boolean isDebugEnabled = LOG.isDebugEnabled(); private static final boolean isTraceEnabled = LOG.isTraceEnabled(); private static final long serialVersionUID = 1L; - private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance(); + + private static final MurmurHash MURMUR = (MurmurHash) MurmurHash.getInstance(); + private static final int RANDOM_SEED = 12345; private transient ObjectInspector[] partitionObjectInspectors; private transient ObjectInspector[] bucketObjectInspectors; @@ -102,6 +106,9 @@ * Evaluators for bucketing columns. This is used to compute bucket number. */ protected transient ExprNodeEvaluator[] bucketEval = null; + + protected transient SkewContext skewContext; + // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is ready protected transient Serializer keySerializer; protected transient boolean keyIsText; @@ -146,6 +153,7 @@ @Override protected void initializeOp(Configuration hconf) throws HiveException { + conf.setNumReducers(hconf.getInt("mapred.reduce.tasks", -1)); try { List keys = conf.getKeyCols(); @@ -161,6 +169,10 @@ protected void initializeOp(Configuration hconf) throws HiveException { for (ExprNodeDesc e : keys) { keyEval[i++] = ExprNodeEvaluatorFactory.get(e); } + skewContext = conf.getSkewContext(); + if (skewContext != null) { + skewContext.initialize(); + } numDistributionKeys = conf.getNumDistributionKeys(); distinctColIndices = conf.getDistinctColumnIndices(); @@ -309,6 +321,10 @@ public void processOp(Object row, int tag) throws HiveException { if (bucketEval != null) { bucketObjectInspectors = initEvaluators(bucketEval, rowInspector); } + if (skewContext != null && + !skewContext.initializeKey(keyWritable, rowInspector, conf.getNumReducers())) { + skewContext = null; // disable + } int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : numDistributionKeys; cachedKeys = new Object[numKeys][keyLen]; @@ -342,7 +358,7 @@ public void processOp(Object row, int tag) throws HiveException { if (autoParallel && partitionEval.length > 0) { hashCode = computeMurmurHash(firstKey); } else { - hashCode = computeHashCode(row); + hashCode = computeHashCode(row, bucketNumber); } firstKey.setHashCode(hashCode); @@ -435,10 +451,14 @@ private void populateCachedDistinctKeys(Object row, int index) throws HiveExcept } protected final int computeMurmurHash(HiveKey firstKey) { - return hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0); + return MURMUR.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0); } - private int computeHashCode(Object row) throws HiveException { + private int computeHashCode(Object row, int buckNum) throws HiveException { + if (skewContext != null && skewContext.evaluateSkew(row, keyWritable)) { + keyWritable.setHashCode(skewContext.isDriver(keyWritable) ? nextRandom() : -1); + return 0; + } // Evaluate the HashCode int keyHashCode = 0; if (partitionEval.length == 0) { @@ -448,10 +468,7 @@ private int computeHashCode(Object row) throws HiveException { // For acid operations make sure to send all records with the same key to the same // FileSinkOperator, as the RecordUpdater interface can't manage multiple writers for a file. if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) { - if (random == null) { - random = new Random(12345); - } - keyHashCode = random.nextInt(); + keyHashCode = nextRandom(); } else { keyHashCode = 1; } @@ -462,10 +479,11 @@ private int computeHashCode(Object row) throws HiveException { + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); } } + int hashCode = bucketNumber < 0 ? keyHashCode : keyHashCode * 31 + bucketNumber; if (isTraceEnabled) { - LOG.trace("Going to return hash code " + (keyHashCode * 31 + bucketNumber)); + LOG.trace("Going to return hash code " + hashCode); } - return bucketNumber < 0 ? keyHashCode : keyHashCode * 31 + bucketNumber; + return hashCode; } private boolean partitionKeysAreNull(Object row) throws HiveException { @@ -481,6 +499,13 @@ private boolean partitionKeysAreNull(Object row) throws HiveException { return false; } + protected int nextRandom() { + if (random == null) { + random = new Random(RANDOM_SEED); + } + return random.nextInt(); + } + // Serialize the keys and append the tag protected HiveKey toHiveKey(Object obj, int tag, Integer distLength) throws SerDeException { BinaryComparable key = (BinaryComparable)keySerializer.serialize(obj, keyObjectInspector); @@ -502,11 +527,11 @@ public void collect(byte[] key, byte[] value, int hash) throws IOException { collect(keyWritable, valueWritable); } - protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException { + protected void collect(HiveKey keyWritable, Writable valueWritable) throws IOException { // Since this is a terminal operator, update counters explicitly - // forward is not called if (null != out) { - out.collect(keyWritable, valueWritable); + keyWritable.collect(out, valueWritable); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 98cf2a7..d86cb5d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; import org.apache.hadoop.hive.ql.io.IOPrepareCache; +import org.apache.hadoop.hive.ql.io.SkewedKeyPartitioner; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -234,11 +235,15 @@ public int execute(DriverContext driverContext) { job.setMapOutputKeyClass(HiveKey.class); job.setMapOutputValueClass(BytesWritable.class); - try { - job.setPartitionerClass((Class) (Class.forName(HiveConf.getVar(job, - HiveConf.ConfVars.HIVEPARTITIONER)))); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e.getMessage()); + if (mWork.isUseInlineSkewContext()) { + job.setPartitionerClass(SkewedKeyPartitioner.class); + } else { + try { + job.setPartitionerClass((Class) (Class.forName(HiveConf.getVar(job, + HiveConf.ConfVars.HIVEPARTITIONER)))); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e.getMessage()); + } } if (mWork.getNumMapTasks() != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java index f9cf2bd..fe967f2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java @@ -20,6 +20,9 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.mapred.OutputCollector; + +import java.io.IOException; /** * HiveKey is a simple wrapper on Text which allows us to set the hashCode @@ -34,6 +37,13 @@ private transient int distKeyLength; + transient int skewGroup = -1; + transient int skewOffset = -1; + + transient boolean[] drivers; + transient int[] skewGroupSize; // size of partition for skewed group + transient int[] skewGroupStart; // starting number of partition for skewed group + public HiveKey() { hashCodeValid = false; } @@ -49,6 +59,70 @@ public void setHashCode(int myHashCode) { hashCode = myHashCode; } + public void setSkewGroup(int skewGroup) { + this.skewGroup = skewGroup; + } + + public int getSkewGroup() { + return skewGroup; + } + + public boolean initializeSkew(int numReduceTasks, boolean[] skewDrivers, int[] skewClusters) { + drivers = skewDrivers; + skewGroupSize = getSkewAssigned(numReduceTasks, skewClusters); + skewGroupStart = getSkewOffsets(numReduceTasks, skewGroupSize); + return skewGroupSize != null; + } + + public boolean isSkewed() { + return skewGroupSize != null; + } + + @SuppressWarnings("unchecked") + public void collect(OutputCollector out, Object value) throws IOException { + if (skewGroup >= 0 && !drivers[skewGroup] && skewGroupSize[skewGroup] > 1) { + // non-driving skewed key. will be propagated to all partitions in the group + for (int i = 0; i < skewGroupSize[skewGroup]; i++) { + skewOffset = i; // this is referenced by SkewedKeyPartitioner + out.collect(this, value); + } + skewOffset = -1; + } else { + out.collect(this, value); + } + } + + // calculates number of reducers for each group + // numReduceTasks : total number of reducer + // skewedSize : size of each group, >= 0 means percent, < 0 means constant number + // + // remaining reducers are assigned to non-skewed keys (skewGroup = -1) + // if remaining reducers is less than one, skew join will be disabled and back to normal join + private int[] getSkewAssigned(int numReduceTasks, int[] skewedSize) { + int remain = numReduceTasks; + int[] assigned = new int[skewedSize.length]; + for (int i = 0; i < assigned.length; i++) { + remain -= assigned[i] = skewedSize[i] < 0 ? -skewedSize[i] : + Math.max(1, (int)(numReduceTasks * (skewedSize[i] / 100.0f))); + } + return remain < 1 ? null : assigned; + } + + // calculate start offset for each group + // for non-skewed group, partition number is 0 ~ offset[0] + // for skew group n, partition number is offset[n] ~ offset[n] - 1 + private int[] getSkewOffsets(int numReduceTasks, int[] assigned) { + if (assigned == null) { + return null; + } + int remain = numReduceTasks; + int[] offsets = new int[assigned.length]; + for (int i = offsets.length - 1; i >= 0; i--) { + offsets[i] = remain -= assigned[i]; + } + return offsets; + } + @Override public int hashCode() { if (!hashCodeValid) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/SkewedKeyPartitioner.java ql/src/java/org/apache/hadoop/hive/ql/io/SkewedKeyPartitioner.java new file mode 100644 index 0000000..bce55b8 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/SkewedKeyPartitioner.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +/** + * Assigns partition number by precalculated skewGroup and skewOffset + * + * 1. skewGroup < 0 (non skewed) + * 2. skewGroup >= 0 and skewOffset >= 0 (skewed, non-driving) + * 3. skewGroup >= 0 and skewOffset < 0 (skewed, driving) + */ +public class SkewedKeyPartitioner extends DefaultHivePartitioner { + + @Override + public int getPartition(HiveKey key, V value, int numReduceTasks) { + if (!key.isSkewed()) { + // normal partitioning + return super.getPartition(key, value, numReduceTasks); + } + if (key.skewGroup < 0) { + return super.getPartition(key, value, key.skewGroupStart[0]); // case 1 + } + int groupSize = key.skewGroupSize[key.skewGroup]; + int groupStart = key.skewGroupStart[key.skewGroup]; + if (groupSize == 1) { + return groupStart; + } + if (key.skewOffset >= 0) { + return groupStart + key.skewOffset; // case 2 + } + return groupStart + super.getPartition(key, value, groupSize); // case 3 + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/InlineSkewJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/InlineSkewJoinOptimizer.java new file mode 100644 index 0000000..9e81255 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/InlineSkewJoinOptimizer.java @@ -0,0 +1,318 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.ql.exec.ByteWritable; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.QBJoinTree; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.SkewContext; +import org.apache.hadoop.hive.ql.udf.generic.NumericHistogram; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; + +public class InlineSkewJoinOptimizer implements Transform { + + private static final Log LOG = LogFactory.getLog(InlineSkewJoinOptimizer.class); + + public ParseContext transform(ParseContext pctx) throws SemanticException { + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", "JOIN%"), new InlineSkewJoinProcessor()); + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, new SkewOptContext(pctx)); + GraphWalker ogw = new DefaultGraphWalker(disp); + + List topNodes = new ArrayList(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + return pctx; + } + + private static class SkewOptContext implements NodeProcessorCtx { + private ParseContext parseContext; + public SkewOptContext(ParseContext pctx) { + this.parseContext = pctx; + } + } + + private static class InlineSkewJoinProcessor implements NodeProcessor { + + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + ParseContext pctx = ((SkewOptContext)procCtx).parseContext; + JoinOperator join = (JoinOperator) nd; + QBJoinTree jointTree = pctx.getJoinContext().get(join); + if (jointTree.getSkewExprs() != null && !jointTree.getSkewExprs().isEmpty()) { + return false; // explicit skew context + } + + Operator[] parents = (Operator[]) join.getParentOperators().toArray(); + + // find driver pos, use biggest one if possible + int driver = parents.length - 1; + List streams = jointTree.getStreamAliases(); + if (streams != null && !streams.isEmpty()) { + driver = JoinReorder.getBiggestPos(join, streams); + } + + ReduceSinkOperator rs = (ReduceSinkOperator) parents[driver]; + TableScanOperator ts = findSingleParent(rs, TableScanOperator.class); + if (ts == null || pctx.getTopToTable().get(ts) == null) { + return false; + } + Table table = pctx.getTopToTable().get(ts); + List sources = rs.getConf().getKeyCols(); + + // revert key expressions of RS to columns of TS + List backtrack = ExprNodeDescUtils.backtrack(sources, rs, ts); + + KeyDistribution[] skewness = getSkewness(pctx, table, backtrack); + if (skewness == null || skewness.length == 0) { + return false; + } + + ArrayList[] skewDrivers = new ArrayList[parents.length]; + ArrayList[] skewClusters = new ArrayList[parents.length]; + ArrayList[] skewKeysExprs = new ArrayList[parents.length]; + + for (KeyDistribution skew : skewness) { + if (!skew.expression.getTypeString().equals("boolean")) { + LOG.info("Boolean type expression is needed. " + + skew.expression.getExprString() + " has type of " + skew.expression.getTypeString()); + return false; + } + int skewPercent = (int) (skew.percent * 100); + for (int pos = 0; pos < parents.length; pos++) { + if (skewKeysExprs[pos] == null) { + skewDrivers[pos] = new ArrayList(); + skewClusters[pos] = new ArrayList(); + skewKeysExprs[pos] = new ArrayList(); + } + skewDrivers[pos].add(pos == driver); + skewClusters[pos].add(skewPercent); + if (pos == driver) { + skewKeysExprs[pos].add(skew.expression); + continue; + } + ReduceSinkOperator input = (ReduceSinkOperator)parents[pos]; + List targets = input.getConf().getKeyCols(); + ExprNodeDesc skewKey = ExprNodeDescUtils.replace(skew.expression, sources, targets); + if (skewKey == null) { + LOG.info("Failed to find join condition for " + skewKey); + return false; + } + skewKeysExprs[pos].add(skewKey); + } + } + + // hand over skew context to all RSs for the join + for (int pos = 0; pos < parents.length; pos++) { + SkewContext context = new SkewContext(); + context.setSkewKeys(skewKeysExprs[pos]); + context.setSkewDrivers(skewDrivers[pos]); + context.setSkewClusters(skewClusters[pos]); + ((ReduceSinkOperator) parents[pos]).getConf().setSkewContext(context); + } + return true; + } + + private KeyDistribution[] getSkewness(ParseContext pctx, Table table, + List sources) throws SemanticException { + try { + return getSkewness(Hive.get(pctx.getConf()), table, sources); + } catch (Exception e) { + LOG.info("Failed to access column statistics for skewness", e); + } + return null; + } + + private List getPrimitiveColumns(List sources) { + Set columns = new HashSet(); + for (ExprNodeDesc source : sources) { + if (isPrimitiveColumn(source)) { + columns.add(((ExprNodeColumnDesc) source).getColumn()); + } + } + return columns.isEmpty() ? Collections.emptyList() : new ArrayList(columns); + } + + private boolean isPrimitiveColumn(ExprNodeDesc source) { + return source instanceof ExprNodeColumnDesc && isPrimitiveNumeric(source.getTypeInfo()); + } + + private KeyDistribution[] getSkewness(Hive hive, Table table, List sources) + throws Exception { + + List columns = getPrimitiveColumns(sources); + if (columns.isEmpty()) { + return new KeyDistribution[0]; + } + List columnStats = + hive.getTableColumnStatistics(table.getDbName(), table.getTableName(), columns); + + List result = new ArrayList(); + for (ExprNodeDesc source : sources) { + if (!isPrimitiveColumn(source)) { + continue; + } + PrimitiveTypeInfo ptype = (PrimitiveTypeInfo) source.getTypeInfo(); + String column = ((ExprNodeColumnDesc)source).getColumn(); + + ColumnStatisticsObj stats = columnStats.get(columns.indexOf(column)); + + NumericHistogram histogram = getNumericHistogram(stats.getStatsData()); + if (histogram == null) { + continue; + } + double[][] published = histogram.publish(); + if (published.length < 10) { + continue; + } + long countSum = 0; + for (int i = 0; i < published.length; i++) { + countSum += (long) published[i][1]; + } + for (int i = 0; i < published.length; i++) { + if (published[i][1] * published.length > (countSum << 1)) { + float percent = (float) (published[i][1] / countSum); + ExprNodeDesc expr = null; + if (i > 0) { + List children = new ArrayList(); + children.add(source.clone()); + children.add(cast(published[i - 1][0], ptype)); + expr = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, + FunctionRegistry.getFunctionInfo(">").getGenericUDF(), children); + percent += published[i - 1][1] / countSum / 2; + } + if (i < published.length - 1) { + List children = new ArrayList(); + children.add(source.clone()); + children.add(cast(published[i + 1][0], ptype)); + ExprNodeDesc expr2 = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, + FunctionRegistry.getFunctionInfo("<").getGenericUDF(), children); + percent += published[i + 1][1] / countSum / 2; + expr = expr == null ? expr2 : ExprNodeDescUtils.mergePredicates(expr, expr2); + } + result.add(new KeyDistribution(expr, percent)); + } + } + } + return result.toArray(new KeyDistribution[result.size()]); + } + + // TODO get histogram from column stats + private NumericHistogram getNumericHistogram(ColumnStatisticsData data) { + return null; + } + } + + private static boolean isPrimitiveNumeric(TypeInfo type) { + if (type.getCategory() != ObjectInspector.Category.PRIMITIVE) { + return false; + } + switch (((PrimitiveTypeInfo) type).getPrimitiveCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + return true; + } + return false; + } + + private static ExprNodeConstantDesc cast(double value, PrimitiveTypeInfo ptype) { + switch (ptype.getPrimitiveCategory()) { + case BYTE: + return new ExprNodeConstantDesc(ptype, new ByteWritable((byte)value)); + case SHORT: + return new ExprNodeConstantDesc(ptype, new ShortWritable((short)value)); + case INT: + return new ExprNodeConstantDesc(ptype, new IntWritable((int)value)); + case LONG: + return new ExprNodeConstantDesc(ptype, new LongWritable((long)value)); + case FLOAT: + return new ExprNodeConstantDesc(ptype, new FloatWritable((float)value)); + case DOUBLE: + return new ExprNodeConstantDesc(ptype, new DoubleWritable(value)); + } + throw new IllegalStateException("Not supported category " + ptype.getPrimitiveCategory()); + } + + private static T findSingleParent(Operator start, Class target) { + if (start.getParentOperators() == null || start.getParentOperators().size() != 1) { + return null; + } + Operator parent = start.getParentOperators().get(0); + if (parent.getClass() == target) { + return (T) parent; + } + return findSingleParent(parent, target); + } + + public static class KeyDistribution { + private ExprNodeDesc expression; + private float percent; + public KeyDistribution(ExprNodeDesc expression, float percent) { + this.expression = expression; + this.percent = percent; + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java index 9238e0e..850ed50 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer; +import java.util.Collection; import java.util.HashSet; import java.util.Set; @@ -50,8 +51,8 @@ * @return The estimated size - 0 (no streamed tables), 1 (streamed tables in * subtree) or 2 (a streamed table) */ - private int getOutputSize(Operator operator, - Set bigTables) { + private static int getOutputSize(Operator operator, + Collection bigTables) { // If a join operator contains a big subtree, there is a chance that its // output is also big, so the output size is 1 (medium) if (operator instanceof JoinOperator) { @@ -114,21 +115,8 @@ private int getOutputSize(Operator operator, * Set of all big tables */ private void reorder(JoinOperator joinOp, Set bigTables) { + int biggestPos = getBiggestPos(joinOp, bigTables); int count = joinOp.getParentOperators().size(); - - // Find the biggest reduce sink - int biggestPos = count - 1; - int biggestSize = getOutputSize( - joinOp.getParentOperators().get(biggestPos), bigTables); - for (int i = 0; i < count - 1; i++) { - int currSize = getOutputSize(joinOp.getParentOperators().get(i), - bigTables); - if (currSize > biggestSize) { - biggestSize = currSize; - biggestPos = i; - } - } - // Reorder tags if need be if (biggestPos != (count - 1)) { Byte[] tagOrder = joinOp.getConf().getTagOrder(); @@ -144,6 +132,21 @@ private void reorder(JoinOperator joinOp, Set bigTables) { } } + public static int getBiggestPos(JoinOperator joinOp, Collection bigTables) { + int count = joinOp.getParentOperators().size(); + // Find the biggest reduce sink + int biggestPos = count - 1; + int biggestSize = getOutputSize(joinOp.getParentOperators().get(biggestPos), bigTables); + for (int i = 0; i < count - 1; i++) { + int currSize = getOutputSize(joinOp.getParentOperators().get(i), bigTables); + if (currSize > biggestSize) { + biggestSize = currSize; + biggestPos = i; + } + } + return biggestPos; + } + /** * Transform the query tree. For each join, check which reduce sink will * output the biggest result (based on STREAMTABLE hints) and give it the diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 0a58200..054c3a9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -119,6 +119,9 @@ public void initialize(HiveConf hiveConf) { transformations.add(new ReduceSinkDeDuplication()); } transformations.add(new NonBlockingOpDeDupProc()); + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_INLINE_SKEWJOIN)) { + transformations.add(new InlineSkewJoinOptimizer()); + } if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVELIMITOPTENABLE)) { transformations.add(new GlobalLimitOptimizer()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g index b72ee5d..3bf8852 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g @@ -86,7 +86,7 @@ fromClause joinSource @init { gParent.pushMsg("join source", state); } @after { gParent.popMsg(state); } - : fromSource ( joinToken^ fromSource ( KW_ON! expression {$joinToken.start.getType() != COMMA}? )? )* + : fromSource ( joinToken^ fromSource ( KW_ON! expression (skewExpression)? {$joinToken.start.getType() != COMMA}? )? )* | uniqueJoinToken^ uniqueJoinSource (COMMA! uniqueJoinSource)+ ; @@ -122,6 +122,26 @@ joinToken | KW_LEFT KW_SEMI KW_JOIN -> TOK_LEFTSEMIJOIN ; +skewExpression +@init { gParent.msgs.push("skew expression"); } +@after { gParent.msgs.pop(); } + : KW_SKEWED KW_ON LPAREN skewKeyExprs RPAREN -> ^(TOK_SKEW skewKeyExprs); + +skewKeyExprs +@init { gParent.msgs.push("skew key expression list"); } +@after { gParent.msgs.pop(); } + : + skewKeyExpr (COMMA skewKeyExpr)* -> ^(TOK_EXPLIST skewKeyExpr+) + ; + +skewKeyExpr +@init { gParent.msgs.push("skew key expression"); } +@after { gParent.msgs.pop(); } + : + expression (KW_CLUSTER KW_BY numerator=Number KW_PERCENT)? + -> ^(TOK_SKEW_EXPR expression $numerator?) + ; + lateralView @init {gParent.pushMsg("lateral view", state); } @after {gParent.popMsg(state); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index ef6d6f7..4895c1d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -92,6 +92,8 @@ TOK_RIGHTOUTERJOIN; TOK_FULLOUTERJOIN; TOK_UNIQUEJOIN; TOK_CROSSJOIN; +TOK_SKEW; +TOK_SKEW_EXPR; TOK_LOAD; TOK_EXPORT; TOK_IMPORT; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java index 94c563f..f50112e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java @@ -56,6 +56,12 @@ // join conditions private transient ArrayList> expressions; + // skew conditions + private transient ArrayList skewExprs; + + // skew percentage for a skew group, -1 for one partition + private transient ArrayList skewClusters; + // key index to nullsafe join flag private ArrayList nullsafes; @@ -363,4 +369,25 @@ public void addPostJoinFilter(ASTNode filter) { public List getPostJoinFilters() { return postJoinFilters; } + + public void setSkewExprs(ArrayList skewExprs) { + this.skewExprs = skewExprs; + } + + public ArrayList getSkewExprs() { + return skewExprs; + } + + public ArrayList getSkewClusters() { + return skewClusters; + } + + public void setSkewClusters(ArrayList skewClusters) { + this.skewClusters = skewClusters; + } + + public void clearSkewContexts() { + skewExprs = null; + skewClusters = null; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 19110ce..7d4542c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -198,6 +198,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ScriptDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.SkewContext; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.UDTFDesc; @@ -1095,7 +1096,7 @@ private boolean isOuterJoinToken(ASTNode node) { @SuppressWarnings("nls") private void processJoin(QB qb, ASTNode join) throws SemanticException { int numChildren = join.getChildCount(); - if ((numChildren != 2) && (numChildren != 3) + if ((numChildren != 2) && (numChildren != 3) && (numChildren != 4) && join.getToken().getType() != HiveParser.TOK_UNIQUEJOIN) { throw new SemanticException(generateErrorMessage(join, "Join with multiple children")); @@ -7386,6 +7387,11 @@ private Operator genJoinOperator(QB qb, QBJoinTree joinTree, joinSrcOp, srcOps, omitOpts); joinContext.put(joinOp, joinTree); + ArrayList skewExprs = joinTree.getSkewExprs(); + if (skewExprs != null && !skewExprs.isEmpty()) { + handleSkews(joinTree, srcOps); + } + Operator op = joinOp; for(ASTNode condn : joinTree.getPostJoinFilters() ) { op = genFilterPlan(qb, condn, op); @@ -7393,6 +7399,91 @@ private Operator genJoinOperator(QB qb, QBJoinTree joinTree, return op; } + // using transitivity of join condition, make cluster expression for other aliases + // by replacing part of skew expression with join counterpart. + private boolean handleSkews(QBJoinTree joinTree, Operator[] srcOps) throws SemanticException { + ArrayList skewExprs = joinTree.getSkewExprs(); + + // for each group, there is one driving alias. + // keys from driving alias are randomly distributed to reducers in the group, + // but those from other aliases are copied to all reducers in the group. + ArrayList[] skewDrivers = new ArrayList[srcOps.length]; + ArrayList[] skewKeysExprs = new ArrayList[srcOps.length]; + for (int i = 0; i < skewExprs.size(); i++) { + int srcPos = candidate(skewExprs.get(i), joinTree.getBaseSrc()); + if (srcPos < 0) { + // there is no table reference, which means it's possibly a constant expression + throw new SemanticException(ErrorMsg.SKEWEXPR_HAS_NO_TABLEALIAS.getMsg( + skewExprs.get(i).toStringTree())); + } + ReduceSinkOperator driver = (ReduceSinkOperator)srcOps[srcPos]; + List sources = driver.getConf().getKeyCols(); + + RowResolver rr = opParseCtx.get(driver.getParentOperators().get(0)).getRowResolver(); + ExprNodeDesc skewKey = genExprNodeDesc(skewExprs.get(i), rr); + // skew condition should return boolean type + if (!skewKey.getTypeString().equals("boolean")) { + throw new SemanticException(ErrorMsg.SKEWEXPR_IS_NOT_BOOLEAN_TYPE.format( + skewKey.getExprString(), skewKey.getTypeString())); + } + for (int pos = 0; pos < srcOps.length; pos++) { + if (skewKeysExprs[pos] == null) { + skewKeysExprs[pos] = new ArrayList(); + skewDrivers[pos] = new ArrayList(); + } + skewDrivers[pos].add(pos == srcPos); + if (pos == srcPos) { + skewKeysExprs[pos].add(skewKey); + continue; + } + ReduceSinkOperator input = (ReduceSinkOperator)srcOps[pos]; + List targets = input.getConf().getKeyCols(); + ExprNodeDesc replaced = ExprNodeDescUtils.replace(skewKey, sources, targets); + if (replaced == null) { + throw new SemanticException(ErrorMsg.SKEWEXPR_IS_NOT_FOUND_IN_JOIN_CONDITION.format( + skewKey.getExprString())); + } + skewKeysExprs[pos].add(replaced); + } + } + // hand over skew context to all RSs for the join + for (int pos = 0; pos < srcOps.length; pos++) { + SkewContext context = new SkewContext(); + context.setSkewKeys(skewKeysExprs[pos]); + context.setSkewDrivers(skewDrivers[pos]); + context.setSkewClusters(joinTree.getSkewClusters()); + ((ReduceSinkOperator) srcOps[pos]).getConf().setSkewContext(context); + } + return true; + } + + // returns first table alias for candidate + private int candidate(ASTNode node, String[] aliases) { + int candidate = -1; + if (node.getToken().getType() == HiveParser.TOK_TABLE_OR_COL) { + String table = node.getChild(0).getText().toLowerCase(); + candidate = indexOf(unescapeIdentifier(table), aliases); + } + if (candidate < 0 && node.getChildCount() > 0) { + for (Node child : node.getChildren()) { + candidate = candidate((ASTNode)child, aliases); + if (candidate >= 0) { + return candidate; + } + } + } + return candidate; + } + + private int indexOf(String target, String[] sources) { + for (int i = 0; i < sources.length; i++) { + if (target.equals(sources[i])) { + return i; + } + } + return -1; + } + /** * Construct a selection operator for semijoin that filter out all fields * other than the group by keys. @@ -7940,8 +8031,15 @@ private QBJoinTree genJoinTree(QB qb, ASTNode joinParseTree, filtersForPushing.add(new ArrayList()); joinTree.setFiltersForPushing(filtersForPushing); + ASTNode skewCond; ASTNode joinCond = (ASTNode) joinParseTree.getChild(2); ArrayList leftSrc = new ArrayList(); + if (joinCond != null && joinCond.getToken().getType() == HiveParser.TOK_SKEW) { + skewCond = joinCond; + joinCond = null; + } else { + skewCond = (ASTNode) joinParseTree.getChild(3); + } parseJoinCondition(joinTree, joinCond, leftSrc, aliasToOpInfo); if (leftSrc.size() == 1) { joinTree.setLeftAlias(leftSrc.get(0)); @@ -7981,6 +8079,33 @@ private QBJoinTree genJoinTree(QB qb, ASTNode joinParseTree, parseStreamTables(joinTree, qb); } + // extract explain skews for inner join (not sure for outer join if it's possible) + if (skewCond != null && condn[0].getJoinType() == JoinType.INNER) { + ASTNode exprList = (ASTNode) skewCond.getChild(0); + assert exprList.getToken().getType() == HiveParser.TOK_EXPLIST; + ArrayList skewCluster = new ArrayList(); + ArrayList skewExpressions = new ArrayList(); + + int percentsum = 0; + for (int i = 0; i < exprList.getChildCount(); i++) { + ASTNode child = (ASTNode) exprList.getChild(i); + skewExpressions.add((ASTNode) child.getChild(0)); + if (child.getChildCount() > 1) { + String numerator = unescapeIdentifier(child.getChild(1).getText()); + int percent = Double.valueOf(numerator).intValue(); + skewCluster.add(percent); + percentsum += percent; + } else { + skewCluster.add(-1); + } + } + if (percentsum >= 100) { + throw new SemanticException(ErrorMsg.SUM_OF_SKEW_CLUSTER_SIZE_IS_TOO_BIG.getMsg()); + } + joinTree.setSkewExprs(skewExpressions); + joinTree.setSkewClusters(skewCluster); + } + return joinTree; } @@ -8172,6 +8297,10 @@ private void mergeJoins(QB qb, QBJoinTree node, QBJoinTree target, int pos, int[ } target.setMapAliases(mapAliases); } + + // not yet supports merging skew context + node.clearSkewContexts(); + target.clearSkewContexts(); } private ObjectPair findMergePos(QBJoinTree node, QBJoinTree target) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index a808fc9..90de0f9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.BucketCol; import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol; import org.apache.hadoop.hive.ql.parse.OpParseContext; @@ -595,4 +596,28 @@ public void setDoSplitsGrouping(boolean doSplitsGrouping) { public boolean getDoSplitsGrouping() { return this.doSplitsGrouping; } + + public boolean isUseInlineSkewContext() { + for (Operator operator : aliasToWork.values()) { + if (isUseInlineSkewContext(operator)) { + return true; + } + } + return false; + } + + private boolean isUseInlineSkewContext(Operator operator) { + if (operator instanceof ReduceSinkOperator && + ((ReduceSinkOperator)operator).getConf().getSkewContext() != null) { + return true; + } + if (operator.getChildOperators() != null && !operator.getChildOperators().isEmpty()) { + for (Operator child : operator.getChildOperators()) { + if (isUseInlineSkewContext(child)) { + return true; + } + } + } + return false; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 2c4175a..7e1530b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -94,6 +94,7 @@ // Write type, since this needs to calculate buckets differently for updates and deletes private AcidUtils.Operation writeType; + private SkewContext skewContext; private static transient Log LOG = LogFactory.getLog(ReduceSinkDesc.class); public ReduceSinkDesc() { @@ -377,4 +378,13 @@ public final void setAutoParallel(final boolean autoParallel) { public AcidUtils.Operation getWriteType() { return writeType; } + + @Explain(displayName = "Explicit Skew Context") + public SkewContext getSkewContext() { + return skewContext; + } + + public void setSkewContext(SkewContext skewContext) { + this.skewContext = skewContext; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/SkewContext.java ql/src/java/org/apache/hadoop/hive/ql/plan/SkewContext.java new file mode 100644 index 0000000..a6e59e3 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SkewContext.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * Context for skewed RS + */ +public class SkewContext implements Serializable { + + private static final long serialVersionUID = 1L; + + private ArrayList skewKeys; + private ArrayList skewClusters; + private ArrayList skewDrivers; + + private transient ExprNodeEvaluator[] keyEvals; + private transient ObjectInspector[] keyObjectInspectors; + + private transient boolean[] drivers; + private transient int[] clusters; + + public void initialize() throws HiveException { + int i = 0; + keyEvals = new ExprNodeEvaluator[skewKeys.size()]; + for (ExprNodeDesc e : skewKeys) { + keyEvals[i++] = ExprNodeEvaluatorFactory.get(e); + } + drivers = getBooleanArray(skewDrivers); + clusters = getIntegerArray(skewClusters); + } + + public boolean initializeKey(HiveKey keyWritable, + ObjectInspector rowInspector, int numReducer) throws HiveException { + boolean runSkew = keyWritable.initializeSkew(numReducer, drivers, clusters); + if (runSkew) { + keyObjectInspectors = Operator.initEvaluators(keyEvals, rowInspector); + } + return runSkew; + } + + private boolean[] getBooleanArray(List booleans) { + boolean[] array = new boolean[booleans.size()]; + for ( int i = 0 ; i < array.length; i++) { + array[i] = booleans.get(i); + } + return array; + } + + private int[] getIntegerArray(List integers) { + int[] array = new int[integers.size()]; + for ( int i = 0 ; i < array.length; i++) { + array[i] = integers.get(i); + } + return array; + } + + @Explain(displayName = "key expressions") + public ArrayList getSkewKeys() { + return skewKeys; + } + + public void setSkewKeys(ArrayList skewKeys) { + this.skewKeys = skewKeys; + } + + @Explain(displayName = "cluster sizes") + public ArrayList getSkewClusters() { + return skewClusters; + } + + public void setSkewClusters(ArrayList skewClusters) { + this.skewClusters = skewClusters; + } + + @Explain(displayName = "drivers", normalExplain = false) + public ArrayList getSkewDrivers() { + return skewDrivers; + } + + public void setSkewDrivers(ArrayList skewDrivers) { + this.skewDrivers = skewDrivers; + } + + // Evaluate skew group and hashcode for the row + // If it's not skewed, return false for normal hashing + public boolean evaluateSkew(Object row, HiveKey hiveKey) throws HiveException { + for (int i = 0; i < keyEvals.length; i++) { + Object eval = keyEvals[i].evaluate(row); + if (eval != null && ((BooleanObjectInspector) keyObjectInspectors[i]).get(eval)) { + hiveKey.setSkewGroup(i); + return true; + } + } + hiveKey.setSkewGroup(-1); + return false; + } + + public boolean isDriver(HiveKey hiveKey) { + return drivers[hiveKey.getSkewGroup()]; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/NumericHistogram.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/NumericHistogram.java index a8c875c..8059db4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/NumericHistogram.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/NumericHistogram.java @@ -317,4 +317,12 @@ public double quantile(double q) { public int getNumBins() { return bins == null ? 0 : bins.size(); } + + public double[][] publish() { + double[][] published = new double[nusedbins][]; + for (int i = 0; i < published.length; i++) { + published[i] = new double[] {bins.get(i).x, bins.get(i).y}; + } + return published; + } } diff --git ql/src/test/queries/clientnegative/skewjoin_explicit_invalid1.q ql/src/test/queries/clientnegative/skewjoin_explicit_invalid1.q new file mode 100644 index 0000000..c1c52d8 --- /dev/null +++ ql/src/test/queries/clientnegative/skewjoin_explicit_invalid1.q @@ -0,0 +1,5 @@ +-- negative, constant expression +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + 0 = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT); diff --git ql/src/test/queries/clientnegative/skewjoin_explicit_invalid2.q ql/src/test/queries/clientnegative/skewjoin_explicit_invalid2.q new file mode 100644 index 0000000..f5e1399 --- /dev/null +++ ql/src/test/queries/clientnegative/skewjoin_explicit_invalid2.q @@ -0,0 +1,5 @@ +-- negative, skew expression is not containing join condition +explain select count(*) from src a JOIN src b ON a.key+1=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT); diff --git ql/src/test/queries/clientnegative/skewjoin_explicit_invalid3.q ql/src/test/queries/clientnegative/skewjoin_explicit_invalid3.q new file mode 100644 index 0000000..a7e6850 --- /dev/null +++ ql/src/test/queries/clientnegative/skewjoin_explicit_invalid3.q @@ -0,0 +1,3 @@ +-- negative, skew expression should be a boolean type +explain select count(*) from src a JOIN src b ON a.key+1=b.key SKEWED ON ( + a.key + 100 CLUSTER BY 20 PERCENT); diff --git ql/src/test/queries/clientnegative/skewjoin_explicit_invalid4.q ql/src/test/queries/clientnegative/skewjoin_explicit_invalid4.q new file mode 100644 index 0000000..81a2a1d --- /dev/null +++ ql/src/test/queries/clientnegative/skewjoin_explicit_invalid4.q @@ -0,0 +1,5 @@ +-- negative, sum of cluster size exceeds 99 percent +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 40 PERCENT, + b.key = 100 CLUSTER BY 40 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT); diff --git ql/src/test/queries/clientpositive/skewjoin_explicit.q ql/src/test/queries/clientpositive/skewjoin_explicit.q new file mode 100644 index 0000000..d797535 --- /dev/null +++ ql/src/test/queries/clientpositive/skewjoin_explicit.q @@ -0,0 +1,107 @@ +set hive.exec.reducers.bytes.per.reducer=600; + +-- SORT_AND_HASH_QUERY_RESULTS + +-- simple (one partition for each cluster) +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0, b.key = 100, CAST(a.key as int) > 300); + +select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0, b.key = 100, CAST(a.key as int) > 300); + +-- percent based assigning +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT); + +select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT); + +-- subquery +explain select count(*) from (select distinct key from src) a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT); + +select count(*) from (select distinct key from src) a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT); + +-- nested skewjoin +explain +select * from + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 10 CLUSTER BY 20 PERCENT, + b.key < 20 CLUSTER BY 20 PERCENT) where a.key < 70) X + JOIN + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 60 CLUSTER BY 20 PERCENT, + b.key < 70 CLUSTER BY 20 PERCENT) where a.key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT); + +select * from + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 10 CLUSTER BY 20 PERCENT, + b.key < 20 CLUSTER BY 20 PERCENT) where a.key < 70) X + JOIN + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 60 CLUSTER BY 20 PERCENT, + b.key < 70 CLUSTER BY 20 PERCENT) where a.key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT); + +set hive.auto.convert.join = true; +set hive.auto.convert.join.noconditionaltask = false; + +-- auto-convert (map-join) +explain +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key; + +-- auto-convert (map-join) +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key; + +set hive.mapjoin.smalltable.filesize=100; + +-- auto-convert (common-join) +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key; + +reset; +set hive.exec.reducers.bytes.per.reducer=3000; + +-- negative, 3 reducers for 5 groups (4 skewed +1 non-skewed), disabled at runtime +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 10 PERCENT, + a.key = 10 CLUSTER BY 10 PERCENT, + a.key = 20 CLUSTER BY 10 PERCENT, + b.key = 30 CLUSTER BY 10 PERCENT); + +select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 10 PERCENT, + a.key = 10 CLUSTER BY 10 PERCENT, + a.key = 20 CLUSTER BY 10 PERCENT, + b.key = 30 CLUSTER BY 10 PERCENT); diff --git ql/src/test/results/clientnegative/skewjoin_explicit_invalid1.q.out ql/src/test/results/clientnegative/skewjoin_explicit_invalid1.q.out new file mode 100644 index 0000000..ae08dad --- /dev/null +++ ql/src/test/results/clientnegative/skewjoin_explicit_invalid1.q.out @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10298]: Failed to find alias for skew expression (= 0 0) diff --git ql/src/test/results/clientnegative/skewjoin_explicit_invalid2.q.out ql/src/test/results/clientnegative/skewjoin_explicit_invalid2.q.out new file mode 100644 index 0000000..a699cd7 --- /dev/null +++ ql/src/test/results/clientnegative/skewjoin_explicit_invalid2.q.out @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10300]: Skew expression (key = 0) is not found in join condition diff --git ql/src/test/results/clientnegative/skewjoin_explicit_invalid3.q.out ql/src/test/results/clientnegative/skewjoin_explicit_invalid3.q.out new file mode 100644 index 0000000..7f36080 --- /dev/null +++ ql/src/test/results/clientnegative/skewjoin_explicit_invalid3.q.out @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10299]: Skew expression (key + 100) has double type, which should be a boolean type diff --git ql/src/test/results/clientnegative/skewjoin_explicit_invalid4.q.out ql/src/test/results/clientnegative/skewjoin_explicit_invalid4.q.out new file mode 100644 index 0000000..ac2f877 --- /dev/null +++ ql/src/test/results/clientnegative/skewjoin_explicit_invalid4.q.out @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10301]: Sum of skew cluster size is too big. diff --git ql/src/test/results/clientpositive/skewjoin_explicit.q.out ql/src/test/results/clientpositive/skewjoin_explicit.q.out new file mode 100644 index 0000000..5435ceb --- /dev/null +++ ql/src/test/results/clientpositive/skewjoin_explicit.q.out @@ -0,0 +1,1101 @@ +PREHOOK: query: -- SORT_AND_HASH_QUERY_RESULTS + +-- simple (one partition for each cluster) +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0, b.key = 100, CAST(a.key as int) > 300) +PREHOOK: type: QUERY +POSTHOOK: query: -- SORT_AND_HASH_QUERY_RESULTS + +-- simple (one partition for each cluster) +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0, b.key = 100, CAST(a.key as int) > 300) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Explicit Skew Context: + cluster sizes: -1, -1, -1 + key expressions: + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + TableScan + alias: a + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Explicit Skew Context: + cluster sizes: -1, -1, -1 + key expressions: + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0, b.key = 100, CAST(a.key as int) > 300) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0, b.key = 100, CAST(a.key as int) > 300) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1028 +OAZzSyVsJ+Qewsa/+ibZ5w== +PREHOOK: query: -- percent based assigning +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +PREHOOK: type: QUERY +POSTHOOK: query: -- percent based assigning +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Explicit Skew Context: + cluster sizes: 20, 20, 40 + key expressions: + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + TableScan + alias: a + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Explicit Skew Context: + cluster sizes: 20, 20, 40 + key expressions: + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1028 +OAZzSyVsJ+Qewsa/+ibZ5w== +PREHOOK: query: -- subquery +explain select count(*) from (select distinct key from src) a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +PREHOOK: type: QUERY +POSTHOOK: query: -- subquery +explain select count(*) from (select distinct key from src) a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-3 is a root stage + Stage-1 depends on stages: Stage-3 + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-3 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: key (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Explicit Skew Context: + cluster sizes: 20, 20, 40 + key expressions: + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + TableScan + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Explicit Skew Context: + cluster sizes: 20, 20, 40 + key expressions: + Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from (select distinct key from src) a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from (select distinct key from src) a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 20 PERCENT, + b.key = 100 CLUSTER BY 20 PERCENT, + cast(a.key as int) > 300 CLUSTER BY 40 PERCENT) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +500 +zuYxEhwuySMvOi8CitXImw== +PREHOOK: query: -- nested skewjoin +explain +select * from + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 10 CLUSTER BY 20 PERCENT, + b.key < 20 CLUSTER BY 20 PERCENT) where a.key < 70) X + JOIN + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 60 CLUSTER BY 20 PERCENT, + b.key < 70 CLUSTER BY 20 PERCENT) where a.key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) +PREHOOK: type: QUERY +POSTHOOK: query: -- nested skewjoin +explain +select * from + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 10 CLUSTER BY 20 PERCENT, + b.key < 20 CLUSTER BY 20 PERCENT) where a.key < 70) X + JOIN + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 60 CLUSTER BY 20 PERCENT, + b.key < 70 CLUSTER BY 20 PERCENT) where a.key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1, Stage-4 + Stage-4 is a root stage + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and (key < 70)) (type: boolean) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + TableScan + alias: a + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and (key < 70)) (type: boolean) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + value expressions: value (type: string) + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {KEY.reducesinkkey0} {VALUE._col0} + 1 + outputColumnNames: _col0, _col1 + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + TableScan + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {KEY.reducesinkkey0} {VALUE._col0} + 1 {KEY.reducesinkkey0} {VALUE._col0} + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 100 Data size: 1065 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 100 Data size: 1065 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 100 Data size: 1065 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-4 + Map Reduce + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and (key > 50)) (type: boolean) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + value expressions: value (type: string) + TableScan + alias: b + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and (key > 50)) (type: boolean) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {KEY.reducesinkkey0} {VALUE._col0} + 1 + outputColumnNames: _col0, _col1 + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select * from + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 10 CLUSTER BY 20 PERCENT, + b.key < 20 CLUSTER BY 20 PERCENT) where a.key < 70) X + JOIN + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 60 CLUSTER BY 20 PERCENT, + b.key < 70 CLUSTER BY 20 PERCENT) where a.key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select * from + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 10 CLUSTER BY 20 PERCENT, + b.key < 20 CLUSTER BY 20 PERCENT) where a.key < 70) X + JOIN + (select a.* from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key < 60 CLUSTER BY 20 PERCENT, + b.key < 70 CLUSTER BY 20 PERCENT) where a.key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +51 val_51 51 val_51 +53 val_53 53 val_53 +54 val_54 54 val_54 +57 val_57 57 val_57 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +58 val_58 58 val_58 +64 val_64 64 val_64 +65 val_65 65 val_65 +66 val_66 66 val_66 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +67 val_67 67 val_67 +69 val_69 69 val_69 +/9J5SFPLb7fHqfUyR1DD4A== +PREHOOK: query: -- auto-convert (map-join) +explain +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key +PREHOOK: type: QUERY +POSTHOOK: query: -- auto-convert (map-join) +explain +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-6 is a root stage , consists of Stage-7, Stage-8, Stage-1 + Stage-7 has a backup stage: Stage-1 + Stage-4 depends on stages: Stage-7 + Stage-2 depends on stages: Stage-1, Stage-4, Stage-5 + Stage-8 has a backup stage: Stage-1 + Stage-5 depends on stages: Stage-8 + Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-6 + Conditional Operator + + Stage: Stage-7 + Map Reduce Local Work + Alias -> Map Local Tables: + y:src + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + y:src + TableScan + alias: src + Filter Operator + predicate: ((key > 50) and key is not null) (type: boolean) + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + HashTable Sink Operator + condition expressions: + 0 {_col0} {_col1} + 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + + Stage: Stage-4 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Filter Operator + predicate: ((key < 70) and key is not null) (type: boolean) + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {_col0} {_col1} + 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1 + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-8 + Map Reduce Local Work + Alias -> Map Local Tables: + x:src + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + x:src + TableScan + alias: src + Filter Operator + predicate: ((key < 70) and key is not null) (type: boolean) + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + HashTable Sink Operator + condition expressions: + 0 {_col1} + 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + + Stage: Stage-5 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Filter Operator + predicate: ((key > 50) and key is not null) (type: boolean) + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {_col0} {_col1} + 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1 + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((key < 70) and key is not null) (type: boolean) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((key > 50) and key is not null) (type: boolean) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Explicit Skew Context: + cluster sizes: 20, 20 + key expressions: + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {KEY.reducesinkkey0} {VALUE._col0} + 1 + outputColumnNames: _col0, _col1 + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: -- auto-convert (map-join) +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: -- auto-convert (map-join) +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +51 val_51 +51 val_51 +51 val_51 +51 val_51 +53 val_53 +54 val_54 +57 val_57 +58 val_58 +58 val_58 +58 val_58 +58 val_58 +64 val_64 +65 val_65 +66 val_66 +67 val_67 +67 val_67 +67 val_67 +67 val_67 +69 val_69 +U7teTw/+GPp9VD0pQK2DoQ== +PREHOOK: query: -- auto-convert (common-join) +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: -- auto-convert (common-join) +select X.* from + (select * from src where key < 70) X + JOIN + (select * from src where key > 50) Y + ON X.key=Y.key SKEWED ON ( + X.key < 55 CLUSTER BY 20 PERCENT, + Y.key > 65 CLUSTER BY 20 PERCENT) order by X.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +51 val_51 +51 val_51 +51 val_51 +51 val_51 +53 val_53 +54 val_54 +57 val_57 +58 val_58 +58 val_58 +58 val_58 +58 val_58 +64 val_64 +65 val_65 +66 val_66 +67 val_67 +67 val_67 +67 val_67 +67 val_67 +69 val_69 +U7teTw/+GPp9VD0pQK2DoQ== +PREHOOK: query: -- negative, 3 reducers for 5 groups (4 skewed +1 non-skewed), disabled at runtime +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 10 PERCENT, + a.key = 10 CLUSTER BY 10 PERCENT, + a.key = 20 CLUSTER BY 10 PERCENT, + b.key = 30 CLUSTER BY 10 PERCENT) +PREHOOK: type: QUERY +POSTHOOK: query: -- negative, 3 reducers for 5 groups (4 skewed +1 non-skewed), disabled at runtime +explain select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 10 PERCENT, + a.key = 10 CLUSTER BY 10 PERCENT, + a.key = 20 CLUSTER BY 10 PERCENT, + b.key = 30 CLUSTER BY 10 PERCENT) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Explicit Skew Context: + cluster sizes: 10, 10, 10, 10 + key expressions: + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + TableScan + alias: a + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Explicit Skew Context: + cluster sizes: 10, 10, 10, 10 + key expressions: + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 10 PERCENT, + a.key = 10 CLUSTER BY 10 PERCENT, + a.key = 20 CLUSTER BY 10 PERCENT, + b.key = 30 CLUSTER BY 10 PERCENT) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from src a JOIN src b ON a.key=b.key SKEWED ON ( + a.key = 0 CLUSTER BY 10 PERCENT, + a.key = 10 CLUSTER BY 10 PERCENT, + a.key = 20 CLUSTER BY 10 PERCENT, + b.key = 30 CLUSTER BY 10 PERCENT) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1028 +OAZzSyVsJ+Qewsa/+ibZ5w==