diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index 4e0c50a662..51408b1c77 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -22,6 +22,8 @@ public static final String LLAP_LOGGER_NAME_QUERY_ROUTING = "query-routing"; public static final String LLAP_LOGGER_NAME_CONSOLE = "console"; public static final String LLAP_LOGGER_NAME_RFA = "RFA"; + public static final String LLAP_NUM_BUCKETS = "llap.num.buckets"; + public static final String LLAP_BUCKET_ID = "llap.bucket.id"; /* Constants for Druid storage handler */ public static final String DRUID_HIVE_STORAGE_HANDLER_ID = diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0c2cf05715..12a7c033a4 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3027,7 +3027,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ("hive.convert.join.bucket.mapjoin.tez", false, "Whether joins can be automatically converted to bucket map joins in hive \n" + "when tez is used as the execution engine."), - + HIVE_TEZ_BMJ_USE_SUBCACHE("hive.tez.bmj.use.subcache", true, + "Use subcache to reuse hashtable across multiple tasks"), HIVE_CHECK_CROSS_PRODUCT("hive.exec.check.crossproducts", true, "Check if a plan contains a Cross Product. If there is one, output a warning to the Session's console."), HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL("hive.localize.resource.wait.interval", "5000ms", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index fe7c90c0a4..f45a0123dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -22,13 +22,14 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ObjectPair; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.CompilationOpContext; @@ -49,6 +50,8 @@ import org.apache.hadoop.hive.ql.exec.persistence.ObjectContainer; import org.apache.hadoop.hive.ql.exec.persistence.UnwrapRowContainer; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.exec.tez.LlapObjectCache; +import org.apache.hadoop.hive.ql.exec.tez.LlapObjectSubCache; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -103,6 +106,10 @@ // Only this table has spilled big table rows protected transient boolean isTestingNoHashTableLoad; + // Only used in bucket map join. + private transient int numBuckets = -1; + private transient int bucketId = -1; + private transient ReentrantLock subCacheLock = new ReentrantLock(); /** Kryo ctor. */ protected MapJoinOperator() { @@ -156,6 +163,9 @@ protected void initializeOp(Configuration hconf) throws HiveException { cache = ObjectCacheFactory.getCache(hconf, queryId, false); loader = getHashTableLoader(hconf); + bucketId = hconf.getInt(Constants.LLAP_BUCKET_ID, -1); + numBuckets = hconf.getInt(Constants.LLAP_NUM_BUCKETS, -1); + hashMapRowGetters = null; mapJoinTables = new MapJoinTableContainer[tagLen]; @@ -193,14 +203,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { Future> future = cache.retrieveAsync( - cacheKey, - new Callable>() { - @Override - public Pair call() - throws HiveException { - return loadHashTable(mapContext, mrContext); - } - }); + cacheKey, () ->loadHashTable(mapContext, mrContext)); asyncInitOperations.add(future); } else if (!isInputFileChangeSensitive(mapContext)) { loadHashTable(mapContext, mrContext); @@ -323,14 +326,9 @@ public void generateMapMetaData() throws HiveException { } } - protected Pair loadHashTable( - ExecMapperContext mapContext, MapredContext mrContext) throws HiveException { - if (canSkipReload(mapContext)) { - // no need to reload - return new ImmutablePair( - mapJoinTables, mapJoinTableSerdes); - } - + // Core logic to load hash table using HashTableLoader + private Pair loadHashTableInternal( + ExecMapperContext mapContext, MapredContext mrContext) throws HiveException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); loader.init(mapContext, mrContext, hconf, this); try { @@ -347,9 +345,8 @@ public void generateMapMetaData() throws HiveException { hashTblInitedOnce = true; - Pair pair - = new ImmutablePair (mapJoinTables, mapJoinTableSerdes); + Pair pair = + new ImmutablePair<> (mapJoinTables, mapJoinTableSerdes); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); @@ -357,10 +354,56 @@ public void generateMapMetaData() throws HiveException { LOG.info("Skipping big table join processing for " + this.toString()); this.setDone(true); } - return pair; } + // Load Hash table for Bucket MapJoin + private Pair loadHashTableBMJ( + ExecMapperContext mapContext, MapredContext mrContext) throws HiveException { + // Bucket MapJoin in LLAP, make sure the caches are populated. + // Get the subcache. + + LlapObjectSubCache> subCache = + new LlapObjectSubCache<>(cache, cacheKey + "_BMJ", numBuckets); + + subCache.lock(bucketId); + try { + Pair pair = + subCache.get(bucketId); + if (pair != null) { + // match found! use it + // update the tables. + mapJoinTables = pair.getLeft(); + mapJoinTableSerdes = pair.getRight(); + return pair; + } + pair = loadHashTableInternal(mapContext, mrContext); + + // update the subcache + subCache.set(pair, bucketId); + return pair; + } finally { + subCache.unlock(bucketId); + } + } + + protected Pair loadHashTable( + ExecMapperContext mapContext, MapredContext mrContext) throws HiveException { + if (canSkipReload(mapContext)) { + // no need to reload + return new ImmutablePair<>(mapJoinTables, mapJoinTableSerdes); + } + + if (conf.isBucketMapJoin() && cache instanceof LlapObjectCache && + numBuckets > 0 && HiveConf.getBoolVar(hconf, + ConfVars.HIVE_TEZ_BMJ_USE_SUBCACHE)) { + // Bucket MapJoin in LLAP + return loadHashTableBMJ(mapContext, mrContext); + } + + return loadHashTableInternal(mapContext, mrContext); + } + // Load the hash table @Override public void cleanUpInputFileChangedOp() throws HiveException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index 8c158f53ac..26afe90faa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -20,20 +20,12 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; import com.google.common.collect.LinkedListMultimap; import org.apache.hadoop.mapred.split.SplitLocationProvider; +import org.apache.tez.runtime.api.events.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -62,10 +54,6 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputSpecUpdate; -import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; -import org.apache.tez.runtime.api.events.InputDataInformationEvent; -import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent; -import org.apache.tez.runtime.api.events.VertexManagerEvent; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -249,7 +237,9 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr } } - LOG.info("Path file splits map for input name: " + inputName + " is " + pathFileSplitsMap); + if (LOG.isDebugEnabled()) { + LOG.debug("Path file splits map for input name: " + inputName + " is " + pathFileSplitsMap); + } Multimap bucketToInitialSplitMap = getBucketSplitMapForPath(pathFileSplitsMap); @@ -263,8 +253,10 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr int availableSlots = totalResource / taskResource; - LOG.info("Grouping splits. " + availableSlots + " available slots, " + waves - + " waves. Bucket initial splits map: " + bucketToInitialSplitMap); + if (LOG.isDebugEnabled()) { + LOG.debug("Grouping splits. " + availableSlots + " available slots, " + waves + + " waves. Bucket initial splits map: " + bucketToInitialSplitMap); + } JobConf jobConf = new JobConf(conf); ShimLoader.getHadoopShims().getMergedCredentials(jobConf); @@ -458,7 +450,8 @@ private void processAllEvents(String inputName, // Set the actual events for the tasks. LOG.info("For input name: " + inputName + " task events size is " + taskEvents.size()); context.addRootInputEvents(inputName, taskEvents); - if (inputToGroupedSplitMap.isEmpty() == false) { + + if (!inputToGroupedSplitMap.isEmpty()) { for (Entry> entry : inputToGroupedSplitMap.entrySet()) { processAllSideEvents(entry.getKey(), entry.getValue()); } @@ -469,6 +462,24 @@ private void processAllEvents(String inputName, // Only done when it is a bucket map join only no SMB. if (numInputsAffectingRootInputSpecUpdate == 1) { setVertexParallelismAndRootInputSpec(inputNameInputSpecMap); + // Send the bucket IDs associated with the tasks, must happen after parallelism is set. + sendBucketIdsToProcessor(); + } + } + + private void sendBucketIdsToProcessor() { + for (Entry> entry : bucketToTaskMap.asMap().entrySet()) { + int bucketNum = entry.getKey(); + for (Integer taskId : entry.getValue()) { + // Create payload + ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.putInt(numBuckets); + buffer.putInt(bucketNum); + buffer.flip(); + // Create the event and send it tez. Tez will route it to appropriate processor + CustomProcessorEvent cpEvent = CustomProcessorEvent.create(buffer); + context.sendEventToProcessor(Collections.singletonList(cpEvent), taskId); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 4d70238f89..1db045f652 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -662,7 +662,8 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, .setCustomInitializerDescriptor(descriptor).build(); } else { // Not HiveInputFormat, or a custom VertexManager will take care of grouping splits - if (vertexHasCustomInput) { + if (vertexHasCustomInput && vertexType == VertexType.MULTI_INPUT_UNINITIALIZED_EDGES) { + // SMB Join. dataSource = MultiMRInput.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build(); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectSubCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectSubCache.java new file mode 100644 index 0000000000..0d31e6e422 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectSubCache.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +import java.lang.ref.SoftReference; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; + +/** + * LlapObjectSubCache. A subcache which lives inside the LlapObjectCache. + * The subcache maintains two lists + * 1. List of softreference to the objects + * 2. List of locks to access the objects. + */ +public class LlapObjectSubCache { + // List of softreferences + private Object[] softReferenceList; + // List of locks to protect the above list + private List locks; + + // Function to create subCache + private Object[] createSubCache(int numEntries) { + return new Object[numEntries]; + } + + // Function to setup locks + private List createSubCacheLocks(int numEntries) { + List lockList = new ArrayList<>(); + for (int i = 0; i < numEntries; i++) { + lockList.add(i, new ReentrantLock()); + } + return lockList; + } + + public LlapObjectSubCache(org.apache.hadoop.hive.ql.exec.ObjectCache cache, + String subCacheKey, + final int numEntries) throws HiveException { + softReferenceList = cache.retrieve(subCacheKey + "_main", () -> createSubCache(numEntries)); + locks = cache.retrieve(subCacheKey + "_locks", () -> createSubCacheLocks(numEntries)); + } + + public void lock(final int index) { + locks.get(index).lock(); + } + + public void unlock(final int index) { + locks.get(index).unlock(); + } + + + @SuppressWarnings("unchecked") + public T get(final int index) { + // Must be held by same thread + Preconditions.checkState(locks.get(index).isHeldByCurrentThread()); + if (softReferenceList[index] != null) { + return ((SoftReference)(softReferenceList[index])).get(); + } + return null; + } + + public void set(T value, final int index) { + // Must be held by same thread + Preconditions.checkState(locks.get(index).isHeldByCurrentThread()); + softReferenceList[index] = new SoftReference<>(value); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index a2d46ef000..fa6160fe3c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -18,12 +18,16 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.IntBuffer; import java.text.NumberFormat; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.hive.conf.Constants; import org.apache.tez.runtime.api.TaskFailureType; +import org.apache.tez.runtime.api.events.CustomProcessorEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -154,7 +158,16 @@ public void close() throws IOException { @Override public void handleEvents(List arg0) { - //this is not called by tez, so nothing to be done here + // As of now only used for Bucket MapJoin, there is exactly one event in the list. + assert arg0.size() <= 1; + for (Event event : arg0) { + CustomProcessorEvent cpEvent = (CustomProcessorEvent) event; + ByteBuffer buffer = cpEvent.getPayload(); + // Get int view of the buffer + IntBuffer intBuffer = buffer.asIntBuffer(); + jobConf.setInt(Constants.LLAP_NUM_BUCKETS, intBuffer.get(0)); + jobConf.setInt(Constants.LLAP_BUCKET_ID, intBuffer.get(1)); + } } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index e59da61664..dc698c8de8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -99,7 +99,12 @@ JoinOperator joinOp = (JoinOperator) nd; long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); // adjust noconditional task size threshold for LLAP - MemoryMonitorInfo memoryMonitorInfo = getMemoryMonitorInfo(maxSize, context.conf); + LlapClusterStateForCompile llapInfo = null; + if ("llap".equalsIgnoreCase(context.conf.getVar(ConfVars.HIVE_EXECUTION_MODE))) { + llapInfo = LlapClusterStateForCompile.getClusterInfo(context.conf); + llapInfo.initClusterInfo(); + } + MemoryMonitorInfo memoryMonitorInfo = getMemoryMonitorInfo(maxSize, context.conf, llapInfo); joinOp.getConf().setMemoryMonitorInfo(memoryMonitorInfo); // not use map join in case of cross product @@ -148,11 +153,15 @@ } } - if (numBuckets > 1) { - if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { - if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) { + if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { + // Check if we are in LLAP, if so it needs to be determined if we should use BMJ or DPHJ + if (llapInfo != null) { + if (selectJoinForLlap(context, joinOp, tezBucketJoinProcCtx, llapInfo, mapJoinConversionPos, numBuckets)) { return null; } + } else if (numBuckets > 1 && + convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) { + return null; } } @@ -181,16 +190,73 @@ return null; } + private boolean selectJoinForLlap(OptimizeTezProcContext context, JoinOperator joinOp, + TezBucketJoinProcCtx tezBucketJoinProcCtx, + LlapClusterStateForCompile llapInfo, + int mapJoinConversionPos, int numBuckets) throws SemanticException { + if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVEDYNAMICPARTITIONHASHJOIN) + && numBuckets > 1) { + // DPHJ is disabled, only attempt BMJ or mapjoin + return convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx); + } + + int numExecutorsPerNode = -1; + if (llapInfo.hasClusterInfo()) { + numExecutorsPerNode = llapInfo.getNumExecutorsPerNode(); + } + if (numExecutorsPerNode == -1) { + numExecutorsPerNode = context.conf.getIntVar(ConfVars.LLAP_DAEMON_NUM_EXECUTORS); + } + + int numNodes = llapInfo.getKnownExecutorCount()/numExecutorsPerNode; + + LOG.debug("Number of nodes = " + numNodes + ". Number of Executors per node = " + numExecutorsPerNode); + + // Determine the size of small table inputs + long totalSize = 0; + for (int pos = 0; pos < joinOp.getParentOperators().size(); pos++) { + if (pos == mapJoinConversionPos) continue; + Operator parentOp = joinOp.getParentOperators().get(pos); + totalSize += parentOp.getStatistics().getDataSize(); + } + + // Size of bigtable + long bigTableSize = joinOp.getParentOperators().get(mapJoinConversionPos).getStatistics().getDataSize(); + + // Network cost of DPHJ + long networkCostDPHJ = totalSize + bigTableSize; + + LOG.info("Cost of dynamically partitioned hash join : total small table size = " + totalSize + + " bigTableSize = " + bigTableSize + "networkCostDPHJ = " + networkCostDPHJ); + + // Network cost of map side join + long networkCostMJ = numNodes * totalSize; + LOG.info("Cost of Bucket Map Join : numNodes = " + numNodes + " total small table size = " + + totalSize + " networkCostMJ = " + networkCostMJ); + + if (networkCostDPHJ < networkCostMJ) { + LOG.info("Dynamically partitioned Hash Join chosen"); + long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + return convertJoinDynamicPartitionedHashJoin(joinOp, context, maxSize); + } else if (numBuckets > 1) { + LOG.info("Bucket Map Join chosen"); + return convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx); + } + // fallback to mapjoin no bucket scaling + LOG.info("Falling back to mapjoin no bucket scaling"); + return false; + } + @VisibleForTesting - public MemoryMonitorInfo getMemoryMonitorInfo(final long maxSize, final HiveConf conf) { + public MemoryMonitorInfo getMemoryMonitorInfo(final long maxSize, + final HiveConf conf, + LlapClusterStateForCompile llapInfo) { final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR); final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY); final long memoryCheckInterval = conf.getLongVar(ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); final float inflationFactor = conf.getFloatVar(ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); final MemoryMonitorInfo memoryMonitorInfo; - if ("llap".equalsIgnoreCase(conf.getVar(ConfVars.HIVE_EXECUTION_MODE))) { - LlapClusterStateForCompile llapInfo = LlapClusterStateForCompile.getClusterInfo(conf); - llapInfo.initClusterInfo(); + if (llapInfo != null) { final int executorsPerNode; if (!llapInfo.hasClusterInfo()) { LOG.warn("LLAP cluster information not available. Falling back to getting #executors from hiveconf.."); @@ -229,7 +295,7 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat TezBucketJoinProcCtx tezBucketJoinProcCtx, final long maxSize) throws SemanticException { // we cannot convert to bucket map join, we cannot convert to // map join either based on the size. Check if we can convert to SMB join. - if ((HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) + if (!(HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN)) || ((!HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN_REDUCE)) && joinOp.getOpTraits().getNumReduceSinks() >= 2)) { fallbackToReduceSideJoin(joinOp, context, maxSize); @@ -391,7 +457,7 @@ private void setAllChildrenTraits(Operator currentOp, Op private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { - if (!checkConvertJoinBucketMapJoin(joinOp, context, bigTablePosition, tezBucketJoinProcCtx)) { + if (!checkConvertJoinBucketMapJoin(joinOp, bigTablePosition, tezBucketJoinProcCtx)) { LOG.info("Check conversion to bucket map join failed."); return false; } @@ -458,6 +524,13 @@ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcCon rsOp.getConf().setPartitionCols(newPartitionCols); } } + + // Update the memory monitor info for LLAP. + MemoryMonitorInfo memoryMonitorInfo = joinDesc.getMemoryMonitorInfo(); + if (memoryMonitorInfo.isLlap()) { + memoryMonitorInfo.setHashTableInflationFactor(1); + memoryMonitorInfo.setMemoryOverSubscriptionFactor(0); + } return true; } @@ -503,15 +576,14 @@ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcCont return false; } ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp; - if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp - .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx, false) == false) { + if (!checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp + .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), false)) { LOG.info("We cannot convert to SMB because the sort column names do not match."); return false; } - if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(), rsOp - .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx, true) - == false) { + if (!checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(), rsOp + .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), true)) { LOG.info("We cannot convert to SMB because bucket column names do not match."); return false; } @@ -540,8 +612,8 @@ private void setNumberOfBucketsOnChildren(Operator curre * can create a bucket map join eliminating the reduce sink. */ private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, - OptimizeTezProcContext context, int bigTablePosition, - TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { + int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) + throws SemanticException { // bail on mux-operator because mux operator masks the emit keys of the // constituent reduce sinks if (!(joinOp.getParentOperators().get(0) instanceof ReduceSinkOperator)) { @@ -556,8 +628,7 @@ private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, List> grandParentColNames = parentOfParent.getOpTraits().getBucketColNames(); int numBuckets = parentOfParent.getOpTraits().getNumBuckets(); // all keys matched. - if (checkColEquality(grandParentColNames, parentColNames, rs.getColumnExprMap(), - tezBucketJoinProcCtx, true) == false) { + if (!checkColEquality(grandParentColNames, parentColNames, rs.getColumnExprMap(), true)) { LOG.info("No info available to check for bucket map join. Cannot convert"); return false; } @@ -575,13 +646,13 @@ private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, private boolean checkColEquality(List> grandParentColNames, List> parentColNames, Map colExprMap, - TezBucketJoinProcCtx tezBucketJoinProcCtx, boolean strict) { + boolean strict) { if ((grandParentColNames == null) || (parentColNames == null)) { return false; } - if ((parentColNames != null) && (parentColNames.isEmpty() == false)) { + if (!parentColNames.isEmpty()) { for (List listBucketCols : grandParentColNames) { // can happen if this operator does not carry forward the previous bucketing columns // for e.g. another join operator which does not carry one of the sides' key columns @@ -608,15 +679,7 @@ private boolean checkColEquality(List> grandParentColNames, } if (colCount == parentColNames.get(0).size()) { - if (strict) { - if (colCount == listBucketCols.size()) { - return true; - } else { - return false; - } - } else { - return true; - } + return !strict || (colCount == listBucketCols.size()); } } } @@ -722,7 +785,7 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c boolean currentInputNotFittingInMemory = false; if ((bigInputStat == null) - || ((bigInputStat != null) && (inputSize > bigInputStat.getDataSize()))) { + || (inputSize > bigInputStat.getDataSize())) { if (foundInputNotFittingInMemory) { // cannot convert to map join; we've already chosen a big table diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java index 0806007d28..df19d72411 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.IOContextMap; import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin; +import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile; import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; import org.apache.hadoop.hive.ql.plan.CollectDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -442,9 +443,14 @@ public void testNoConditionalTaskSizeForLlap() { long defaultNoConditionalTaskSize = 1024L * 1024L * 1024L; HiveConf hiveConf = new HiveConf(); + LlapClusterStateForCompile llapInfo = null; + if ("llap".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_MODE))) { + llapInfo = LlapClusterStateForCompile.getClusterInfo(hiveConf); + llapInfo.initClusterInfo(); + } // execution mode not set, null is returned assertEquals(defaultNoConditionalTaskSize, convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, - hiveConf).getAdjustedNoConditionalTaskSize()); + hiveConf, llapInfo).getAdjustedNoConditionalTaskSize()); hiveConf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "llap"); // default executors is 4, max slots is 3. so 3 * 20% of noconditional task size will be oversubscribed @@ -453,7 +459,7 @@ public void testNoConditionalTaskSizeForLlap() { int maxSlots = 3; long expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * maxSlots)); assertEquals(expectedSize, - convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf) + convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo) .getAdjustedNoConditionalTaskSize()); // num executors is less than max executors per query (which is not expected case), default executors will be @@ -462,18 +468,18 @@ public void testNoConditionalTaskSizeForLlap() { hiveConf.set(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname, "5"); expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * chosenSlots)); assertEquals(expectedSize, - convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf) + convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo) .getAdjustedNoConditionalTaskSize()); // disable memory checking hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL.varname, "0"); assertFalse( - convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf).doMemoryMonitoring()); + convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo).doMemoryMonitoring()); // invalid inflation factor hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL.varname, "10000"); hiveConf.set(HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR.varname, "0.0f"); assertFalse( - convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf).doMemoryMonitoring()); + convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo).doMemoryMonitoring()); } }