diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index a1e0babf85..5706936d36 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -20,10 +20,12 @@ import java.io.IOException; import java.io.Serializable; +import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -49,6 +51,7 @@ import org.apache.hadoop.hive.ql.exec.persistence.ObjectContainer; import org.apache.hadoop.hive.ql.exec.persistence.UnwrapRowContainer; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.exec.tez.LlapObjectCache; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -103,6 +106,12 @@ // Only this table has spilled big table rows protected transient boolean isTestingNoHashTableLoad; + // Only used in bucket map join. + private transient int numBuckets = -1; + private transient int bucketId = -1; + private transient String BMJsubCacheKey; + private transient String subCacheLocksKey; + private transient int counter = 0; // count access to subcache /** Kryo ctor. */ protected MapJoinOperator() { @@ -149,6 +158,10 @@ protected void initializeOp(Configuration hconf) throws HiveException { int tagLen = conf.getTagLength(); + bucketId = hconf.getInt("mapred.bucket.id", -1); + numBuckets = hconf.getInt("mapred.num.buckets", -1); + LOG.info("Deepak : numBuckets = " + numBuckets + " bucket ID = " + bucketId); + // On Tez only: The hash map might already be cached in the container we run // the task in. On MR: The cache is a no-op. String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID); @@ -156,6 +169,10 @@ protected void initializeOp(Configuration hconf) throws HiveException { cache = ObjectCacheFactory.getCache(hconf, queryId, false); loader = getHashTableLoader(hconf); + // Bucket MapJoin specific cache keys + BMJsubCacheKey = cacheKey + "_BMJ"; + subCacheLocksKey = cacheKey + "_BMJ_locks"; + hashMapRowGetters = null; mapJoinTables = new MapJoinTableContainer[tagLen]; @@ -193,16 +210,10 @@ protected void initializeOp(Configuration hconf) throws HiveException { Future> future = cache.retrieveAsync( - cacheKey, - new Callable>() { - @Override - public Pair call() - throws HiveException { - return loadHashTable(mapContext, mrContext); - } - }); + cacheKey, () -> loadHashTable(mapContext, mrContext)); asyncInitOperations.add(future); } else if (!isInputFileChangeSensitive(mapContext)) { + LOG.info("Deepak : initializeOp : loadHashTable"); loadHashTable(mapContext, mrContext); hashTblInitedOnce = true; } @@ -291,7 +302,7 @@ public void setTestMapJoinTableContainer(int posSmallTable, return valueOI; } - public void generateMapMetaData() throws HiveException { + private void generateMapMetaData() throws HiveException { // generate the meta data for key // index for key is -1 @@ -323,14 +334,29 @@ public void generateMapMetaData() throws HiveException { } } - protected Pair loadHashTable( - ExecMapperContext mapContext, MapredContext mrContext) throws HiveException { - if (canSkipReload(mapContext)) { - // no need to reload - return new ImmutablePair( - mapJoinTables, mapJoinTableSerdes); + // Function to create subcache of hash table soft references used in + // bucket mapjoin. + private List>> createSubCache(int numBuckets) { + List>> subCache = new ArrayList<>(numBuckets); + for (int i = 0; i < numBuckets; i++) { + subCache.add(null); + } + return subCache; + } + + // Function to create the subcache locks + private List createSubCacheLocks(int numBuckets) { + List subCacheLocks = new ArrayList<>(numBuckets); + for (int i = 0; i < numBuckets; i++) { + subCacheLocks.add(i, new ReentrantLock()); } + return subCacheLocks; + } + // Core logic to load hash table using HashTableLoader + private Pair loadHashTableInternal( + ExecMapperContext mapContext, MapredContext mrContext) throws HiveException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); loader.init(mapContext, mrContext, hconf, this); try { @@ -347,9 +373,8 @@ public void generateMapMetaData() throws HiveException { hashTblInitedOnce = true; - Pair pair - = new ImmutablePair (mapJoinTables, mapJoinTableSerdes); + Pair pair = + new ImmutablePair<> (mapJoinTables, mapJoinTableSerdes); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); @@ -357,10 +382,66 @@ public void generateMapMetaData() throws HiveException { LOG.info("Skipping big table join processing for " + this.toString()); this.setDone(true); } - return pair; } + // Load Hash table for Bucket MapJoin + private Pair loadHashTableBMJ( + ExecMapperContext mapContext, MapredContext mrContext) throws HiveException { + + assert numBuckets > 0; + // Bucket MapJoin in LLAP, make sure the caches are populated. + // Get the subcache. + List>> subCache = + cache.retrieve(BMJsubCacheKey, () -> createSubCache(numBuckets)); + + // Get the subcache locks + List subCacheLock = cache.retrieve(subCacheLocksKey, () -> createSubCacheLocks(numBuckets)); + + LOG.info("Deepak : Entering critical path. Bucket ID = " + bucketId); + + subCacheLock.get(bucketId).lock(); + LOG.info("Deepak : acquired lock for bucket ID = " + bucketId); + try { + Pair pair; + if (subCache.get(bucketId) != null) { + pair = subCache.get(bucketId).get(); + // match found! use it + LOG.info("Deepak : Found a pair for bucket id = " + bucketId); + // update the tables. + mapJoinTables = pair.getLeft(); + mapJoinTableSerdes = pair.getRight(); + counter++; + return pair; + } + + pair = loadHashTableInternal(mapContext, mrContext); + // Update the subcache + subCache.set(bucketId, new SoftReference<>(pair)); + LOG.info("Deepak : successfully loaded hash table for bucket id " + bucketId); + return pair; + } finally { + LOG.info("Deepak : unlocking finally after finding pair for bucket id = " + bucketId); + subCacheLock.get(bucketId).unlock(); + } + } + + private Pair loadHashTable( + ExecMapperContext mapContext, MapredContext mrContext) throws HiveException { + if (canSkipReload(mapContext)) { + // no need to reload + return new ImmutablePair<>(mapJoinTables, mapJoinTableSerdes); + } + + if (conf.isBucketMapJoin() && cache instanceof LlapObjectCache && + numBuckets > 0) { + // Bucket MapJoin in LLAP + return loadHashTableBMJ(mapContext, mrContext); + } + + return loadHashTableInternal(mapContext, mrContext); + } + // Load the hash table @Override public void cleanUpInputFileChangedOp() throws HiveException { @@ -597,6 +678,7 @@ public void closeOp(boolean abort) throws HiveException { } this.loader = null; + LOG.info("Deepak : Number of subcache accesses = " + counter); super.closeOp(abort); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index 8974e9b79b..3c1b77e8ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -20,20 +20,12 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; import com.google.common.collect.LinkedListMultimap; import org.apache.hadoop.mapred.split.SplitLocationProvider; +import org.apache.tez.runtime.api.events.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -62,10 +54,6 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputSpecUpdate; -import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; -import org.apache.tez.runtime.api.events.InputDataInformationEvent; -import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent; -import org.apache.tez.runtime.api.events.VertexManagerEvent; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -458,7 +446,8 @@ private void processAllEvents(String inputName, // Set the actual events for the tasks. LOG.info("For input name: " + inputName + " task events size is " + taskEvents.size()); context.addRootInputEvents(inputName, taskEvents); - if (inputToGroupedSplitMap.isEmpty() == false) { + + if (!inputToGroupedSplitMap.isEmpty()) { for (Entry> entry : inputToGroupedSplitMap.entrySet()) { processAllSideEvents(entry.getKey(), entry.getValue()); } @@ -470,6 +459,24 @@ private void processAllEvents(String inputName, if (numInputsAffectingRootInputSpecUpdate == 1) { setVertexParallelismAndRootInputSpec(inputNameInputSpecMap); } + // Send the bucket IDs associated with the tasks, must happen after parallelism is set. + sendBucketIdsToProcessor(); + } + + private void sendBucketIdsToProcessor() { + for (Entry> entry : bucketToTaskMap.asMap().entrySet()) { + int bucketNum = entry.getKey(); + for (Integer taskId : entry.getValue()) { + // Create payload + ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.putInt(numBuckets); + buffer.putInt(bucketNum); + buffer.flip(); + // Create the event and send it tez. Tez will route it to appropriate processor + CustomProcessorEvent cpEvent = new CustomProcessorEvent(buffer); + context.sendEventToProcessor(Collections.singletonList(cpEvent), taskId); + } + } } private void diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 00b34865af..b889a63198 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -18,12 +18,14 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.nio.ByteBuffer; import java.text.NumberFormat; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.tez.runtime.api.TaskFailureType; +import org.apache.tez.runtime.api.events.CustomProcessorEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -154,7 +156,14 @@ public void close() throws IOException { @Override public void handleEvents(List arg0) { - //this is not called by tez, so nothing to be done here + // As of now only used for Bucket MapJoin, there is exactly one event in the list. + assert arg0.size() <= 1; + for (Event event : arg0) { + CustomProcessorEvent cpEvent = (CustomProcessorEvent) event; + ByteBuffer buffer = cpEvent.getPayload(); + jobConf.setInt("mapred.num.buckets", buffer.getInt()); + jobConf.setInt("mapred.bucket.id", buffer.getInt()); + } } @Override