From 7c1fe0639bfa016061bb5c714246fa6ff18434d8 Mon Sep 17 00:00:00 2001 From: Panos Garefalakis Date: Wed, 27 May 2020 11:51:48 +0100 Subject: [PATCH] Adding FilterContext as part of LLAP ColumnVectorBatch (propagated in ReadPipeline). Also moving the code that prepares VectorBatches in EncodedDataConsumer in a separate method -- this will be handy when dealing with row-filters later on. Change-Id: I0177756e842e60f6850c966cfa44fe0d53df4a28 --- .../llap/io/api/impl/ColumnVectorBatch.java | 15 +++++++++++ .../io/decode/OrcEncodedDataConsumer.java | 27 +++++++++++-------- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java index 19b0b55c56f..52dc072fa57 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java @@ -20,12 +20,17 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.filter.MutableFilterContext; + +import java.util.Arrays; /** * Unlike VRB, doesn't have some fields, and doesn't have all columns * (non-selected, partition cols, cols for downstream ops, etc.) + * It does, however, hold the FilterContext of the VRB. */ public class ColumnVectorBatch { + public MutableFilterContext filterContext; public ColumnVector[] cols; public int size; @@ -34,6 +39,7 @@ public ColumnVectorBatch(int columnCount) { } public ColumnVectorBatch(int columnCount, int batchSize) { + this.filterContext = new VectorizedRowBatch(0); this.cols = new ColumnVector[columnCount]; this.size = batchSize; } @@ -51,6 +57,15 @@ public String toString() { return ""; } StringBuilder b = new StringBuilder(); + b.append("FilterContext used: "); + b.append(filterContext.isSelectedInUse()); + b.append(", size: "); + b.append(filterContext.getSelectedSize()); + b.append('\n'); + b.append("Selected: "); + b.append(filterContext.isSelectedInUse() ? Arrays.toString(filterContext.getSelected()) : "[]"); + b.append('\n'); + b.append("Column vector types: "); for (int k = 0; k < cols.length; k++) { ColumnVector cv = cols[k]; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 79dba426596..9459a4ff64f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -152,17 +152,10 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, } ColumnVectorBatch cvb = cvbPool.take(); + cvb.filterContext.reset(); // assert cvb.cols.length == batch.getColumnIxs().length; // Must be constant per split. cvb.size = batchSize; for (int idx = 0; idx < columnReaders.length; ++idx) { - TreeReader reader = columnReaders[idx]; - if (cvb.cols[idx] == null) { - // Orc store rows inside a root struct (hive writes it this way). - // When we populate column vectors we skip over the root struct. - cvb.cols[idx] = createColumn(batchSchemas[idx], VectorizedRowBatch.DEFAULT_SIZE, useDecimal64ColumnVectors); - } - trace.logTreeReaderNextVector(idx); - /* * Currently, ORC's TreeReaderFactory class does this: * @@ -198,9 +191,8 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, * it doesn't get confused. * */ - ColumnVector cv = cvb.cols[idx]; - cv.reset(); - cv.ensureSize(batchSize, false); + TreeReader reader = columnReaders[idx]; + ColumnVector cv = prepareColumnVector(cvb, idx, batchSize); reader.nextVector(cv, null, batchSize); } @@ -218,6 +210,19 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, } } + private ColumnVector prepareColumnVector(ColumnVectorBatch cvb, int idx, int batchSize) { + if (cvb.cols[idx] == null) { + // Orc store rows inside a root struct (hive writes it this way). + // When we populate column vectors we skip over the root struct. + cvb.cols[idx] = createColumn(batchSchemas[idx], VectorizedRowBatch.DEFAULT_SIZE, useDecimal64ColumnVectors); + } + trace.logTreeReaderNextVector(idx); + ColumnVector cv = cvb.cols[idx]; + cv.reset(); + cv.ensureSize(batchSize, false); + return cv; + } + private void createColumnReaders(OrcEncodedColumnBatch batch, ConsumerStripeMetadata stripeMetadata, TypeDescription fileSchema) throws IOException { TreeReaderFactory.Context context = new TreeReaderFactory.ReaderContext() -- 2.20.1 (Apple Git-117)