Index: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java (date 1579179393000) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java (date 1579266955397) @@ -75,6 +75,12 @@ putRowInternal(currentKey, currentValue); } + @Override + public boolean containsKey(byte[] currentKey) { + // Method to be used only for Probe-Decode with FAST HashTable implementations + throw new RuntimeException("Not implemented"); + } + protected void putRowInternal(BytesWritable key, BytesWritable value) throws SerDeException, HiveException, IOException { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java (date 1579179393000) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java (date 1579264317460) @@ -154,6 +154,11 @@ } } + @Override + public boolean containsKey(byte[] currentKey) { + return adaptContainsKey(currentKey); + } + /* * A Unit Test convenience method for putting key and value into the hash table using the * actual types. Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (date 1579179393000) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (date 1579279606783) @@ -1756,6 +1756,11 @@ "use BloomFilter in Hybrid grace hash join to minimize unnecessary spilling."), HIVEMAPJOINFULLOUTER("hive.mapjoin.full.outer", true, "Whether to use MapJoin for FULL OUTER JOINs."), + HIVE_MAPJOIN_PROBEDECODE_ENABLED("hive.mapjoin.probedecode.enabled", false, + "Use cached MapJoin hashtable created on the small table side to filter out row columns that are not going\n "+ + "to be used when reading the large table data. This will result less CPU cycles spent for decoding unused data. "), + HIVE_MAPJOIN_PROBEDECODE_COLKEY("hive.mapjoin.probedecode.cachekey", 0, "" + + "The Column Key the RecordReader will use to read the hastTable. Operator specific."), HIVE_TEST_MAPJOINFULLOUTER_OVERRIDE( "hive.test.mapjoin.full.outer.override", "none", new StringSet("none", "enable", "disable"), Index: llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java (date 1579179393000) +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java (date 1579283471485) @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.io.IOException; import java.util.List; +import java.util.Stack; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -41,8 +42,12 @@ import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.SchemaEvolutionFactory; import org.apache.hadoop.hive.llap.io.decode.ReadPipeline; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinCommonOperator; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; @@ -53,6 +58,7 @@ import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; @@ -104,6 +110,7 @@ private final ExecutorService executor; private final boolean isAcidScan; private final boolean isAcidFormat; + private final boolean probeDecodeEnabled; /** * Creates the record reader and checks the input-specific compatibility. @@ -173,6 +180,9 @@ LOG.info("Queue limit for LlapRecordReader is " + limit); this.queue = new ArrayBlockingQueue<>(limit); + this.probeDecodeEnabled = HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_MAPJOIN_PROBEDECODE_ENABLED); + LOG.info("LlapRecordReader ProbeDecode is " + this.probeDecodeEnabled); + int partitionColumnCount = rbCtx.getPartitionColumnCount(); if (partitionColumnCount > 0) { @@ -195,11 +205,61 @@ this.includes = new IncludesImpl(tableIncludedCols, isAcidFormat, rbCtx, schema, job, isAcidScan && acidReader.includeAcidColumns()); + if (this.probeDecodeEnabled) { + this.includes.setProbeDecodeOp(getProbeDecodeHashTable(mapWork, job)); + } + // Create the consumer of encoded data; it will coordinate decoding to CVBs. feedback = rp = cvp.createReadPipeline(this, split, includes, sarg, counters, includes, sourceInputFormat, sourceSerDe, reporter, job, mapWork.getPathToPartitionInfo()); } + private static VectorMapJoinCommonOperator getProbeDecodeHashTable(MapWork mapWork, JobConf job) { + Stack> opStack = new Stack>(); + // Children BFS + opStack.addAll(mapWork.getWorks()); + VectorMapJoinCommonOperator vop = null; + while (!opStack.empty()) { + Operator op = opStack.pop(); + if (op instanceof VectorMapJoinCommonOperator && noDynamicPruningMapJoin(op) && validProbeDecodeMapJoin(op)) { + vop = (VectorMapJoinCommonOperator) op; + int colIndex = vop.getInputVectorizationContext().getProjectedColumns().get(0); + job.setInt(ConfVars.HIVE_MAPJOIN_PROBEDECODE_COLKEY.varname, colIndex); + LOG.info("ProbeDecode found MapJoin op: "+ op.getName() + " - my ColumnID "+ colIndex + + "ColumnName " +vop.getInputVectorizationContext().getProjectionColumnNames().get(0)); + break; + } + if (op.getChildOperators() != null) { + opStack.addAll(op.getChildOperators()); + } + } + return vop; + } + + private static boolean validProbeDecodeMapJoin(Operator mapJoinOp) { + return mapJoinOp instanceof VectorMapJoinCommonOperator && + ((VectorMapJoinCommonOperator) mapJoinOp).getInputVectorizationContext().getProjectedColumns().size() == 1; + + } + // A MapJoin with DPP is a not a valid option for probeDecode + private static boolean noDynamicPruningMapJoin(Operator mapJoinOp) { + Stack> opStack = new Stack>(); + // Children BFS + opStack.addAll(mapJoinOp.getChildOperators()); + while (!opStack.empty()) { + Operator op = opStack.pop(); + // Check if there is a Dynamic Partitioning Event involved + if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) { + // found dynamic partition pruning operator + return false; + } + if (op.getChildOperators() != null) { + opStack.addAll(op.getChildOperators()); + } + } + return true; + } + private static int getQueueVar(ConfVars var, JobConf jobConf, Configuration daemonConf) { // Check job config for overrides, otherwise use the default server value. int jobVal = jobConf.getInt(var.varname, -1); @@ -627,6 +687,9 @@ private TypeDescription readerSchema; private JobConf jobConf; + // ProbeDecode HashTable ref + private VectorMapJoinCommonOperator probeDecodeOp; + public IncludesImpl(List tableIncludedCols, boolean isAcidScan, VectorizedRowBatchCtx rbCtx, TypeDescription readerSchema, JobConf jobConf, boolean includeAcidColumns) { @@ -644,7 +707,6 @@ tableIncludedCols.add(i); } } - LOG.debug("Logical table includes: {}", tableIncludedCols); this.readerLogicalColumnIds = tableIncludedCols; // Note: schema evolution currently does not support column index changes. // So, the indices should line up... to be fixed in SE v2? @@ -683,6 +745,20 @@ this.includeAcidColumns = includeAcidColumns; } + public void setProbeDecodeOp(VectorMapJoinCommonOperator probeDecodeOp) { + this.probeDecodeOp = probeDecodeOp; + } + + @Override + public VectorMapJoinHashTable getProbeDecodeTable() { + return probeDecodeOp.getVectorMapJoinHashTable(); + } + + @Override + public JobConf getJobConf() { + return jobConf; + } + @Override public String toString() { return "logical columns " + readerLogicalColumnIds @@ -724,4 +800,4 @@ fileSchema, filePhysicalColumnIds, acidStructColumnId); } } -} +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java (date 1579179393000) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java (date 1579190695149) @@ -68,6 +68,7 @@ if (value != null && LOG.isDebugEnabled()) { LOG.debug("Found " + key + " in cache"); } + LOG.info("PANOS reading sync cached key: "+ key + " with value: "+ value); return value; } finally { lock.unlock(); @@ -110,6 +111,7 @@ if (LOG.isDebugEnabled()) { LOG.debug("Found " + key + " in cache"); } + LOG.info("PANOS reading cached key: "+ key + " with value: "+ value); return value; } } finally { @@ -127,7 +129,7 @@ if (LOG.isDebugEnabled()) { LOG.debug("Caching new object for key: " + key); } - + LOG.info("PANOS caching key: "+ key + " with value: "+ value); registry.put(key, value); locks.remove(key); } finally { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java (date 1579179393000) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java (date 1579265350747) @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.hive.common.util.HashCodeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashTable; @@ -53,6 +54,40 @@ add(keyBytes, 0, keyLength, currentValue); } + // Same method can be used for all Bytes-Hash implementations (hash map, hash multi-set, or hash set) + @Override + public boolean containsKey(byte[] currentKey) { + int keyLength = currentKey.length; + + long hashCode = HashCodeUtil.murmurHash(currentKey, 0, keyLength); + int intHashCode = (int) hashCode; + int slot = (intHashCode & logicalHashBucketMask); + long probeSlot = slot; + int i = 0; + boolean keyExists; + long refWord; + final long partialHashCode = + VectorMapJoinFastBytesHashKeyRef.extractPartialHashCode(hashCode); + while (true) { + refWord = slots[slot]; + if (refWord == 0) { + keyExists = false; + break; + } + if (VectorMapJoinFastBytesHashKeyRef.getPartialHashCodeFromRefWord(refWord) == + partialHashCode && + VectorMapJoinFastBytesHashKeyRef.equalKey( + refWord, currentKey, 0, keyLength, writeBuffers, unsafeReadPos)) { + keyExists = true; + break; + } + // Some other key (collision) - keep probing. + probeSlot += (++i); + slot = (int) (probeSlot & logicalHashBucketMask); + } + return keyExists; + } + public abstract void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable currentValue); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java (date 1579179393000) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java (date 1579263956966) @@ -41,6 +41,13 @@ void putRow(BytesWritable currentKey, BytesWritable currentValue) throws SerDeException, HiveException, IOException; + /** + * + * @param currentKey + * @return true if HashTable contains the given key + */ + boolean containsKey(byte [] currentKey); + /** * Get hash table size */ Index: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java (date 1579179393000) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java (date 1579264981665) @@ -55,6 +55,11 @@ adaptPutRow(currentKey, currentValue); } + @Override + public boolean containsKey(byte[] currentKey) { + return adaptContainsKey(currentKey); + } + /* * A Unit Test convenience method for putting the key into the hash table using the * actual type. Index: llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java (date 1579179393000) +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java (date 1579280454725) @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -49,6 +50,8 @@ List getPhysicalColumnIds(); List getReaderLogicalColumnIds(); TypeDescription[] getBatchReaderTypes(TypeDescription fileSchema); + VectorMapJoinHashTable getProbeDecodeTable(); + JobConf getJobConf(); } ReadPipeline createReadPipeline(Consumer consumer, FileSplit split, Index: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java (date 1579179393000) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java (date 1579281259073) @@ -221,6 +221,7 @@ int allMatchCount = 0; int equalKeySeriesCount = 0; int spillCount = 0; + int nullSkippedCount = 0; /* * Single-Column Long specific variables. @@ -245,6 +246,7 @@ if (!joinColVector.noNulls && joinColVector.isNull[batchIndex]) { currentKey = 0; isNull = true; + nullSkippedCount ++; } else { currentKey = vector[batchIndex]; isNull = false; @@ -364,6 +366,7 @@ if (LOG.isDebugEnabled()) { LOG.debug(CLASS_NAME + " allMatchs " + intArrayToRangesString(allMatchs,allMatchCount) + + " skippedNulls " + nullSkippedCount + " equalKeySeriesHashMapResultIndices " + intArrayToRangesString(equalKeySeriesHashMapResultIndices, equalKeySeriesCount) + " equalKeySeriesAllMatchIndices " + intArrayToRangesString(equalKeySeriesAllMatchIndices, equalKeySeriesCount) + " equalKeySeriesIsSingleValue " + Arrays.toString(Arrays.copyOfRange(equalKeySeriesIsSingleValue, 0, equalKeySeriesCount)) + Index: llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java (date 1579179393000) +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java (date 1579280301674) @@ -83,6 +83,7 @@ return conf; } + // If this file split is part of Large table of a MapJoin -> delay until we have the HashTable ? @Override public ReadPipeline createReadPipeline( Consumer consumer, FileSplit split, Includes includes, @@ -90,6 +91,7 @@ InputFormat unused0, Deserializer unused1, Reporter reporter, JobConf job, Map parts) throws IOException { cacheMetrics.incrCacheReadRequests(); + LlapIoImpl.LOG.info("ProbeDecode ORC split "+ split.toString() + " "); OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer( consumer, includes, _skipCorrupt, counters, ioMetrics); OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, Index: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java (date 1579179393000) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java (date 1579282734539) @@ -19,7 +19,9 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; import java.io.IOException; +import java.util.ArrayList; +import com.google.common.primitives.Longs; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -215,6 +217,41 @@ metricExpands++; } + public void printHastTableKeys() { + for (int slot = 0; slot < logicalHashBucketCount; slot++) { + int pairIndex = slot * 2; + long valueRef = slotPairs[pairIndex]; + int found = 1; + if (valueRef != 0) { + long tableKey = slotPairs[pairIndex + 1]; + LOG.debug("SlotCount: "+ found++ +" HS key: "+ tableKey + " with valueRef: " + valueRef); + } + } + } + + public ArrayList getHashTablekeys(){ + ArrayList toReturn = new ArrayList<>(keysAssigned); + for (int slot = 0; slot < logicalHashBucketCount; slot++) { + int pairIndex = slot * 2; + long valueRef = slotPairs[pairIndex]; + if (valueRef != 0) { + long tableKey = slotPairs[pairIndex + 1]; + toReturn.add(tableKey); + } + } + return toReturn; + } + + public boolean adaptContainsKey(byte[] currentKey) { + long key = Longs.fromByteArray(currentKey); + return containsKey(key); + } + + protected boolean containsKey(long key) { + long hashCode = HashCodeUtil.calculateLongHashCode(key); + return findReadSlot(key, hashCode) != -1; + } + protected int findReadSlot(long key, long hashCode) { int intHashCode = (int) hashCode; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java (date 1579179393000) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java (date 1579279135796) @@ -25,6 +25,7 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastLongHashTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -270,6 +271,7 @@ protected transient boolean needHashTableSetup; // The small table hash table for the native vectorized map join operator. + // PASS a reference to this!! protected transient VectorMapJoinHashTable vectorMapJoinHashTable; protected transient long batchCounter; @@ -625,16 +627,18 @@ @Override protected void completeInitializationOp(Object[] os) throws HiveException { - // setup mapJoinTables and serdes + // setup mapJoinTables and serdes -- Async Load from cache or Disk if spilled super.completeInitializationOp(os); if (isTestingNoHashTableLoad) { return; } + // Probably not needed MapJoinTableContainer mapJoinTableContainer = mapJoinTables[posSingleVectorMapJoinSmallTable]; + // setup small hashTable here setUpHashTable(); } @@ -669,6 +673,13 @@ VectorMapJoinTableContainer vectorMapJoinTableContainer = (VectorMapJoinTableContainer) mapJoinTables[posSingleVectorMapJoinSmallTable]; vectorMapJoinHashTable = vectorMapJoinTableContainer.vectorMapJoinHashTable(); + if (HiveConf.getBoolVar(getConfiguration(), HiveConf.ConfVars.HIVE_MAPJOIN_PROBEDECODE_ENABLED) && + vectorMapJoinHashTable instanceof VectorMapJoinFastLongHashTable) { + String queryId = HiveConf.getVar(getConfiguration(), HiveConf.ConfVars.HIVEQUERYID); + // We could now propagate.. + LOG.info("PANOS HERE Small HashTable Keys: " + vectorMapJoinHashTable.size() + " position: "+ posSingleVectorMapJoinSmallTable); + LOG.info("PANOS using queryId: "+ queryId + " and cacheKey: " + this.getCacheKey()); + } } break; default: @@ -846,6 +857,10 @@ } } + public VectorMapJoinHashTable getVectorMapJoinHashTable() { + return this.vectorMapJoinHashTable; + } + @Override public OperatorType getType() { return OperatorType.MAPJOIN; Index: llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java (date 1579179393000) +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java (date 1579283881804) @@ -21,7 +21,11 @@ import java.util.List; import java.util.concurrent.Callable; +import com.google.common.primitives.Longs; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; @@ -31,6 +35,10 @@ import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; +import org.apache.hadoop.hive.ql.exec.ObjectCache; +import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector; @@ -43,6 +51,13 @@ import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastLongHashTable; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTableContainer; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; import org.apache.orc.CompressionCodec; import org.apache.orc.impl.PositionProvider; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; @@ -74,6 +89,10 @@ private final Includes includes; private TypeDescription[] batchSchemas; private boolean useDecimal64ColumnVectors; + // ProbeDecode extra variables + private Boolean mapJoinProbeDecodeEnabled = false; + private VectorMapJoinHashTable smallMapJoinTable = null; + private int probeDecodeColId = 0; public OrcEncodedDataConsumer( Consumer consumer, Includes includes, boolean skipCorrupt, @@ -82,6 +101,15 @@ this.includes = includes; // TODO: get rid of this this.skipCorrupt = skipCorrupt; + this.smallMapJoinTable = includes.getProbeDecodeTable(); + this.mapJoinProbeDecodeEnabled = HiveConf.getBoolVar(includes.getJobConf(), HiveConf.ConfVars.HIVE_MAPJOIN_PROBEDECODE_ENABLED); + this.probeDecodeColId = HiveConf.getIntVar(includes.getJobConf(), HiveConf.ConfVars.HIVE_MAPJOIN_PROBEDECODE_COLKEY); + // && split.toString().contains("store_sales"); +// if (this.mapJoinProbeDecodeEnabled) { +// this.cacheKey = MapJoinDesc.generateCacheKey(HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_MAPJOIN_PROBEDECODE_CACHEKEY)); +// this.cache = ObjectCacheFactory.getCache(conf, queryId, false); +// } + LlapIoImpl.LOG.info("OrcEncodedDataConsumer probeDecode enabled: "+ mapJoinProbeDecodeEnabled); } public void setUseDecimal64ColumnVectors(final boolean useDecimal64ColumnVectors) { @@ -142,6 +170,21 @@ } previousStripeIndex = currentStripeIndex; +// if (this.mapJoinProbeDecodeEnabled && columnReaders.length == 20) { +// LlapIoImpl.LOG.info("PANOS HERE Waiting..."); +// Thread.sleep(300); +// LlapIoImpl.LOG.info("PANOS HERE Waiting...DONE!"); +// } + + // Check once per call + if (this.mapJoinProbeDecodeEnabled && this.smallMapJoinTable != null) { + LlapIoImpl.LOG.info("ProbeDecode finally got HashTable of size: " + this.smallMapJoinTable.size()); + } else if (this.mapJoinProbeDecodeEnabled) { + this.smallMapJoinTable = includes.getProbeDecodeTable(); + LlapIoImpl.LOG.info("ProbeDecode finally DID not get HashTable yet.. " + (this.smallMapJoinTable == null)); + } + + // For every row group for (int i = 0; i < maxBatchesRG; i++) { // for last batch in row group, adjust the batch size if (i == maxBatchesRG - 1) { @@ -152,6 +195,9 @@ ColumnVectorBatch cvb = cvbPool.take(); // assert cvb.cols.length == batch.getColumnIxs().length; // Must be constant per split. cvb.size = batchSize; + boolean [] skipRows = new boolean[batchSize]; + short skippedCount = 0; + // for every column of the row group for (int idx = 0; idx < columnReaders.length; ++idx) { TreeReader reader = columnReaders[idx]; if (cvb.cols[idx] == null) { @@ -199,8 +245,37 @@ ColumnVector cv = cvb.cols[idx]; cv.reset(); cv.ensureSize(batchSize, false); - reader.nextVector(cv, null, batchSize); + reader.nextVector(cv, skipRows, batchSize); + // Read the first ColumnVector (hopefully key column) + // Use the populated HashTable to create a boolean array with the skip keys + // Use the array to reduce the number of decoded values +// if (this.lazyDecodingEnabled && this.smallTableHashTable != null){ +// LlapIoImpl.LOG.info("PANOS HERE Schema:" + fileSchema.getFieldNames() + " curr " + fileSchema.getFieldNames().get(idx) + " CV class: " + cv.getClass()); +// LlapIoImpl.LOG.info("PANOS HERE idx:" + idx + " ColumnReader ID: "+ reader.getColumnId() + " col: " + reader.toString()); +// LlapIoImpl.LOG.info("PANOS HERE cv:" + cv.toString()); +// +// } + // fileSchema.getFieldNames().get(idx).startsWith("operation") + if (this.mapJoinProbeDecodeEnabled && columnReaders.length > 1 && cv instanceof LongColumnVector && + this.smallMapJoinTable != null && idx == this.probeDecodeColId) { + // TODO: What about other ColumnVector cases + LongColumnVector dcv = (LongColumnVector) cv; + Arrays.fill(skipRows, false); + for (int row = 0; row < batchSize; row++) { + // if dcv.vector[row] not in HashTable + if (!this.smallMapJoinTable.containsKey(Longs.toByteArray(dcv.vector[row]))) { + skipRows[row] = true; + skippedCount ++; + } + } + // Propagate skipRows values -> isNull to avoid probing again on the Join Side: i.e: used by the VectorMapJoinInnerLongOperator + if (skippedCount > 0) { + dcv.noNulls = false; + System.arraycopy(skipRows, 0, dcv.isNull, 0, batchSize); + } + } } + LlapIoImpl.LOG.info("PANOS HERE, RG skipped rows: " + skippedCount + " for columns: " + columnReaders.length); // we are done reading a batch, send it to consumer for processing downstreamConsumer.consumeData(cvb); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java (date 1579179393000) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java (date 1579264945955) @@ -63,6 +63,11 @@ } } + @Override + public boolean containsKey(byte[] currentKey) { + return adaptContainsKey(currentKey); + } + /* * A Unit Test convenience method for putting the key into the hash table using the * actual type.