diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index c2ee54c11a..e309664953 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -25,6 +25,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; @@ -41,6 +43,7 @@ import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.SchemaEvolutionFactory; import org.apache.hadoop.hive.llap.io.decode.ReadPipeline; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -82,6 +85,7 @@ private final SearchArgument sarg; private final VectorizedRowBatchCtx rbCtx; private final boolean isVectorized; + private final boolean probeDecodeEnabled; private VectorizedOrcAcidRowBatchReader acidReader; private final Object[] partitionValues; @@ -195,6 +199,12 @@ private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split, this.includes = new IncludesImpl(tableIncludedCols, isAcidFormat, rbCtx, schema, job, isAcidScan && acidReader.includeAcidColumns()); + this.probeDecodeEnabled = HiveConf.getBoolVar(jobConf, ConfVars.HIVE_OPTIMIZE_SCAN_PROBEDECODE); + if (this.probeDecodeEnabled) { + includes.setProbeDecodeContext(mapWork.getProbeDecodeContext()); + LOG.info("LlapRecordReader ProbeDecode is enabled"); + } + // Create the consumer of encoded data; it will coordinate decoding to CVBs. feedback = rp = cvp.createReadPipeline(this, split, includes, sarg, counters, includes, sourceInputFormat, sourceSerDe, reporter, job, mapWork.getPathToPartitionInfo()); @@ -627,6 +637,9 @@ public float getProgress() throws IOException { private TypeDescription readerSchema; private JobConf jobConf; + // ProbeDecode Context for row-level filtering + private TableScanOperator.ProbeDecodeContext probeDecodeContext = null; + public IncludesImpl(List tableIncludedCols, boolean isAcidScan, VectorizedRowBatchCtx rbCtx, TypeDescription readerSchema, JobConf jobConf, boolean includeAcidColumns) { @@ -708,6 +721,10 @@ public SchemaEvolution createSchemaEvolution(TypeDescription fileSchema) { fileSchema, filePhysicalColumnIds, acidStructColumnId); } + public void setProbeDecodeContext(TableScanOperator.ProbeDecodeContext currProbeDecodeContext) { + this.probeDecodeContext = currProbeDecodeContext; + } + @Override public List getPhysicalColumnIds() { return filePhysicalColumnIds; @@ -723,5 +740,39 @@ public SchemaEvolution createSchemaEvolution(TypeDescription fileSchema) { return OrcInputFormat.genIncludedTypes( fileSchema, filePhysicalColumnIds, acidStructColumnId); } + + @Override + public String getQueryId() { + return HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVEQUERYID); + } + + @Override + public boolean isProbeDecodeEnabled() { + return this.probeDecodeContext != null; + } + + @Override + public byte getProbeMjSmallTablePos() { + return this.probeDecodeContext.getMjSmallTablePos(); + } + + @Override + public int getProbeColIdx() { + // TODO: is this the best way to get the ColId? + Pattern pattern = Pattern.compile("_col([0-9]+)"); + Matcher matcher = pattern.matcher(this.probeDecodeContext.getMjBigTableKeyColName()); + return matcher.find() ? Integer.parseInt(matcher.group(1)) : -1; + } + + @Override + public String getProbeColName() { + return this.probeDecodeContext.getMjBigTableKeyColName(); + } + + @Override + public String getProbeCacheKey() { + return this.probeDecodeContext.getMjSmallTableCacheKey(); + } + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java index a830c07c9e..e37379be8c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java @@ -49,6 +49,12 @@ List getPhysicalColumnIds(); List getReaderLogicalColumnIds(); TypeDescription[] getBatchReaderTypes(TypeDescription fileSchema); + String getQueryId(); + boolean isProbeDecodeEnabled(); + byte getProbeMjSmallTablePos(); + String getProbeCacheKey(); + String getProbeColName(); + int getProbeColIdx(); } ReadPipeline createReadPipeline(Consumer consumer, FileSplit split, diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 1b41d4e476..b697a0d573 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -84,6 +84,10 @@ public OrcEncodedDataConsumer( this.includes = includes; // TODO: get rid of this this.skipCorrupt = skipCorrupt; + if (includes.isProbeDecodeEnabled()) { + LlapIoImpl.LOG.info("OrcEncodedDataConsumer probeDecode is enabled with cacheKey {} colIndex {} and colName {}", + this.includes.getProbeCacheKey(), this.includes.getProbeColIdx(), this.includes.getProbeColName()); + } } public void setUseDecimal64ColumnVectors(final boolean useDecimal64ColumnVectors) {