diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 2597848..32dc222 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -101,6 +101,7 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, } previousStripeIndex = currentStripeIndex; + ColumnVectorBatch sarged_cvb; for (int i = 0; i < maxBatchesRG; i++) { // for last batch in row group, adjust the batch size if (i == maxBatchesRG - 1) { @@ -116,9 +117,23 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, cvb.cols[idx] = (ColumnVector)columnReaders[idx].nextVector(cvb.cols[idx], batchSize); } - // we are done reading a batch, send it to consumer for processing - downstreamConsumer.consumeData(cvb); - counters.incrCounter(QueryFragmentCounters.Counter.ROWS_EMITTED, batchSize); + while (cvb.size >= 0) { + if (sarged_cvb == null) { + ColumnVectorBatch sarged_cvb = cvbPool.take(); + } + sargApp.pickRow(cvb, sarged_cvb); + assert sarged_cvb.size >= cvb.size; + cvbPool.offer(cvb); + if (sarged_cvb.size >= VectorizedRowBatch.DEFAULT_SIZE) { + downstreamConsumer.consumeData(sarged_cvb); + sarged_cvb = null; + counters.incrCounter(QueryFragmentCounters.Counter.ROWS_EMITTED, sarged_cvb.size); + } + } + } + if (sarged_cvb != null) { + downstreamConsumer.consumeData(sarged_cvb); + counters.incrCounter(QueryFragmentCounters.Counter.ROWS_EMITTED, sarged_cvb.size); } counters.incrTimeCounter(QueryFragmentCounters.Counter.DECODE_TIME_US, startTime); counters.incrCounter(QueryFragmentCounters.Counter.NUM_VECTOR_BATCHES, maxBatchesRG);