diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8a561e5771..994656c207 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2267,6 +2267,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Whether to enable shared work extended optimizer. The optimizer tries to merge equal operators\n" + "after a work boundary after shared work optimizer has been executed. Requires hive.optimize.shared.work\n" + "to be set to true. Tez only."), + HIVE_SHARED_WORK_REUSE_MAPJOIN_CACHE("hive.optimize.shared.work.mapjoin.cache.reuse", true, + "When shared work optimizer is enabled, whether we should reuse the cache for the broadcast side\n" + + "of mapjoin operators that share same broadcast input. Requires hive.optimize.shared.work\n" + + "to be set to true. Tez only."), HIVE_COMBINE_EQUIVALENT_WORK_OPTIMIZATION("hive.combine.equivalent.work.optimization", true, "Whether to " + "combine equivalent work objects during physical optimization.\n This optimization looks for equivalent " + "work objects and combines them if they meet certain preconditions. Spark only."), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 114cea91d6..2c9a364e5c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -183,7 +183,12 @@ protected void initializeOp(Configuration hconf) throws HiveException { // On Tez only: The hash map might already be cached in the container we run // the task in. On MR: The cache is a no-op. String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID); - cacheKey = "HASH_MAP_" + this.getOperatorId() + "_container"; + // The cacheKey may have already been defined in the MapJoin conf spec + // as part of the Shared Work Optimization if it can be reused among + // multiple mapjoin operators. If it is not defined, then we generate it. + cacheKey = conf.getCacheKey() == null ? + MapJoinDesc.generateCacheKey(this.getOperatorId()) : + conf.getCacheKey(); cache = ObjectCacheFactory.getCache(hconf, queryId, false); loader = getHashTableLoader(hconf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java index 257375473a..5a7fb0678a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -508,6 +509,46 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } } + if(pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_REUSE_MAPJOIN_CACHE)) { + // Try to reuse cache for broadcast side in mapjoin operators that + // share same input. + // First we group together all the mapjoin operators that share same + // reduce sink operator. + final Multimap, MapJoinOperator> parentToMapJoinOperators = + ArrayListMultimap.create(); + final Set> visitedOperators = new HashSet<>(); + for (Entry, Collection>> e : + optimizerCache.operatorToWorkOperators.asMap().entrySet()) { + if (visitedOperators.contains(e.getKey())) { + // Already visited this work, we move on + continue; + } + for (Operator op : e.getValue()) { + if (op instanceof MapJoinOperator) { + MapJoinOperator mapJoinOp = (MapJoinOperator) op; + ReduceSinkOperator rsOp = + mapJoinOp.getParentOperators().get(0) instanceof ReduceSinkOperator ? + (ReduceSinkOperator) mapJoinOp.getParentOperators().get(0) : + (ReduceSinkOperator) mapJoinOp.getParentOperators().get(1); + parentToMapJoinOperators.put(rsOp.getParentOperators().get(0), mapJoinOp); + } + visitedOperators.add(op); + } + } + // For each group, set the cache key accordingly if there is more than one operator + for (Collection c : parentToMapJoinOperators.asMap().values()) { + String cacheKey = null; + for (MapJoinOperator mapJoinOp : c) { + if (cacheKey == null) { + // Generate at the first map join operator + cacheKey = MapJoinDesc.generateCacheKey(mapJoinOp.getOperatorId()); + } + // Set in the conf of the map join operator + mapJoinOp.getConf().setCacheKey(cacheKey); + } + } + } + // If we are running tests, we are going to verify that the contents of the cache // correspond with the contents of the plan, and otherwise we fail. // This check always run when we are running in test mode, independently on whether diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index 8ba5101326..ff4f4783d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -88,6 +88,8 @@ private boolean isHybridHashJoin; private boolean isDynamicPartitionHashJoin = false; + private String cacheKey; + public MapJoinDesc() { bigTableBucketNumMapping = new LinkedHashMap(); } @@ -111,14 +113,15 @@ public MapJoinDesc(MapJoinDesc clone) { this.parentDataSizes = clone.parentDataSizes; this.isBucketMapJoin = clone.isBucketMapJoin; this.isHybridHashJoin = clone.isHybridHashJoin; + this.cacheKey = clone.cacheKey; } public MapJoinDesc(final Map> keys, - final TableDesc keyTblDesc, final Map> values, - final List valueTblDescs, final List valueFilteredTblDescs, List outputColumnNames, - final int posBigTable, final JoinCondDesc[] conds, - final Map> filters, boolean noOuterJoin, String dumpFilePrefix, - final MemoryMonitorInfo memoryMonitorInfo, final long inMemoryDataSize) { + final TableDesc keyTblDesc, final Map> values, + final List valueTblDescs, final List valueFilteredTblDescs, List outputColumnNames, + final int posBigTable, final JoinCondDesc[] conds, + final Map> filters, boolean noOuterJoin, String dumpFilePrefix, + final MemoryMonitorInfo memoryMonitorInfo, final long inMemoryDataSize) { super(values, outputColumnNames, noOuterJoin, conds, filters, null, memoryMonitorInfo); this.keys = keys; this.keyTblDesc = keyTblDesc; @@ -128,6 +131,7 @@ public MapJoinDesc(final Map> keys, this.bigTableBucketNumMapping = new LinkedHashMap(); this.dumpFilePrefix = dumpFilePrefix; this.inMemoryDataSize = inMemoryDataSize; + this.cacheKey = null; initRetainExprList(); } @@ -478,6 +482,18 @@ public String getDebugOuterFilterMapString() { return Arrays.deepToString(fm); } + public String getCacheKey() { + return cacheKey; + } + + public void setCacheKey(String cacheKey) { + this.cacheKey = cacheKey; + } + + public static String generateCacheKey(String operatorId) { + return "HASH_MAP_" + operatorId + "_container"; + } + // Use LinkedHashSet to give predictable display order. private static final Set vectorizableMapJoinNativeEngines = new LinkedHashSet(Arrays.asList("tez", "spark"));