diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index 44cbc63..29d895a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -117,6 +117,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, int numBuckets = -1; EdgeType edgeType = EdgeType.BROADCAST_EDGE; if (mapJoinOp.getConf().isBucketMapJoin()) { + + // disable auto parallelism for bucket map joins + parentRS.getConf().setAutoParallel(false); + numBuckets = (Integer) mapJoinOp.getConf().getBigTableBucketNumMapping().values().toArray()[0]; if (mapJoinOp.getConf().getCustomBucketMapJoin()) { edgeType = EdgeType.CUSTOM_EDGE; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 8c1d336..5db8111 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -87,7 +87,7 @@ private float topNMemoryUsage = -1; private boolean mapGroupBy; // for group-by, values with same key on top-K should be forwarded private boolean skipTag; // Skip writing tags when feeding into mapjoin hashtable - private boolean autoParallel = false; // Is reducer parallelism automatic or fixed + private Boolean autoParallel = null; // Is reducer auto-parallelism enabled, disabled or unset private static transient Log LOG = LogFactory.getLog(ReduceSinkDesc.class); public ReduceSinkDesc() { @@ -140,7 +140,7 @@ public Object clone() { desc.setBucketCols(bucketCols); desc.setStatistics(this.getStatistics()); desc.setSkipTag(skipTag); - desc.setAutoParallel(autoParallel); + desc.autoParallel = autoParallel; return desc; } @@ -344,10 +344,16 @@ public boolean getSkipTag() { } public final boolean isAutoParallel() { - return autoParallel; + return (autoParallel != null) && autoParallel; } public final void setAutoParallel(final boolean autoParallel) { - this.autoParallel = autoParallel; + // we don't allow turning on auto parallel once it has been + // explicitely turned off. That is to avoid scenarios where + // auto parallelism could break assumptions about number of + // reducers or hash function. + if (this.autoParallel == null || this.autoParallel == true) { + this.autoParallel = autoParallel; + } } }