diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d1e6631975..dea175480a 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2282,6 +2282,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Whether to enable shared work extended optimizer. The optimizer tries to merge equal operators\n" + "after a work boundary after shared work optimizer has been executed. Requires hive.optimize.shared.work\n" + "to be set to true. Tez only."), + HIVE_SHARED_WORK_REUSE_MAPJOIN_CACHE("hive.optimize.shared.work.mapjoin.cache.reuse", true, + "When shared work optimizer is enabled, whether we should reuse the cache for the broadcast side\n" + + "of mapjoin operators that share same broadcast input. Requires hive.optimize.shared.work\n" + + "to be set to true. Tez only."), HIVE_COMBINE_EQUIVALENT_WORK_OPTIMIZATION("hive.combine.equivalent.work.optimization", true, "Whether to " + "combine equivalent work objects during physical optimization.\n This optimization looks for equivalent " + "work objects and combines them if they meet certain preconditions. Spark only."), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 114cea91d6..da1dd426c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -183,7 +183,16 @@ protected void initializeOp(Configuration hconf) throws HiveException { // On Tez only: The hash map might already be cached in the container we run // the task in. On MR: The cache is a no-op. String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID); - cacheKey = "HASH_MAP_" + this.getOperatorId() + "_container"; + // The cacheKey may have already been defined in the MapJoin conf spec + // as part of the Shared Work Optimization if it can be reused among + // multiple mapjoin operators. In that case, we take that key from conf + // and append this.getClass().getName() to disambiguate between different + // classes that may be using the same source data, e.g. + // VectorMapJoinInnerGenerateResultOperator and VectorMapJoinLeftSemiLongOperator. + // If the cacheKey is not defined in the conf, then we generate it. + cacheKey = conf.getCacheKey() == null ? + MapJoinDesc.generateCacheKey(this.getOperatorId()) : + conf.getCacheKey() + "_" + this.getClass().getName(); cache = ObjectCacheFactory.getCache(hconf, queryId, false); loader = getHashTableLoader(hconf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java index 257375473a..1e3887b157 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -508,6 +509,58 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } } + if(pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_REUSE_MAPJOIN_CACHE)) { + // Try to reuse cache for broadcast side in mapjoin operators that + // share same input. + // First we group together all the mapjoin operators that share same + // reduce sink operator. + final Multimap, MapJoinOperator> parentToMapJoinOperators = + ArrayListMultimap.create(); + final Set> visitedOperators = new HashSet<>(); + for (Entry, Collection>> e : + optimizerCache.operatorToWorkOperators.asMap().entrySet()) { + if (visitedOperators.contains(e.getKey())) { + // Already visited this work, we move on + continue; + } + for (Operator op : e.getValue()) { + if (op instanceof MapJoinOperator) { + MapJoinOperator mapJoinOp = (MapJoinOperator) op; + // Only allowed for mapjoin operator + if (!mapJoinOp.getConf().isBucketMapJoin() && + !mapJoinOp.getConf().isDynamicPartitionHashJoin()) { + parentToMapJoinOperators.put( + obtainBroadcastInput(mapJoinOp).getParentOperators().get(0), mapJoinOp); + } + } + visitedOperators.add(op); + } + } + // For each group, set the cache key accordingly if there is more than one operator + // and input RS operator are equal + for (Collection c : parentToMapJoinOperators.asMap().values()) { + Map rsOpToCacheKey = new HashMap<>(); + for (MapJoinOperator mapJoinOp : c) { + ReduceSinkOperator rsOp = obtainBroadcastInput(mapJoinOp); + String cacheKey = null; + for (Entry e: rsOpToCacheKey.entrySet()) { + if (compareOperator(pctx, rsOp, e.getKey())) { + cacheKey = e.getValue(); + break; + } + } + if (cacheKey == null) { + // Either it is the first map join operator or there was no equivalent RS, + // hence generate cache key + cacheKey = MapJoinDesc.generateCacheKey(mapJoinOp.getOperatorId()); + rsOpToCacheKey.put(rsOp, cacheKey); + } + // Set in the conf of the map join operator + mapJoinOp.getConf().setCacheKey(cacheKey); + } + } + } + // If we are running tests, we are going to verify that the contents of the cache // correspond with the contents of the plan, and otherwise we fail. // This check always run when we are running in test mode, independently on whether @@ -534,6 +587,15 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { return pctx; } + /** + * Obtain the RS input for a mapjoin operator. + */ + private static ReduceSinkOperator obtainBroadcastInput(MapJoinOperator mapJoinOp) { + return mapJoinOp.getParentOperators().get(0) instanceof ReduceSinkOperator ? + (ReduceSinkOperator) mapJoinOp.getParentOperators().get(0) : + (ReduceSinkOperator) mapJoinOp.getParentOperators().get(1); + } + /** * This method gathers the TS operators with DPP from the context and * stores them into the input optimization cache. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index 8ba5101326..507114b700 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -88,6 +88,8 @@ private boolean isHybridHashJoin; private boolean isDynamicPartitionHashJoin = false; + private String cacheKey; + public MapJoinDesc() { bigTableBucketNumMapping = new LinkedHashMap(); } @@ -111,6 +113,7 @@ public MapJoinDesc(MapJoinDesc clone) { this.parentDataSizes = clone.parentDataSizes; this.isBucketMapJoin = clone.isBucketMapJoin; this.isHybridHashJoin = clone.isHybridHashJoin; + this.cacheKey = clone.cacheKey; } public MapJoinDesc(final Map> keys, @@ -128,6 +131,7 @@ public MapJoinDesc(final Map> keys, this.bigTableBucketNumMapping = new LinkedHashMap(); this.dumpFilePrefix = dumpFilePrefix; this.inMemoryDataSize = inMemoryDataSize; + this.cacheKey = null; initRetainExprList(); } @@ -478,6 +482,18 @@ public String getDebugOuterFilterMapString() { return Arrays.deepToString(fm); } + public String getCacheKey() { + return cacheKey; + } + + public void setCacheKey(String cacheKey) { + this.cacheKey = cacheKey; + } + + public static String generateCacheKey(String operatorId) { + return "HASH_MAP_" + operatorId + "_container"; + } + // Use LinkedHashSet to give predictable display order. private static final Set vectorizableMapJoinNativeEngines = new LinkedHashSet(Arrays.asList("tez", "spark"));