diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java index f2d2832afa..02d7cd7777 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java @@ -18,12 +18,12 @@ package org.apache.hadoop.hive.llap.io.decode; import java.lang.management.ThreadMXBean; +import java.lang.ref.WeakReference; import java.util.concurrent.Callable; import org.apache.hadoop.hive.common.Pool; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch; import org.apache.hadoop.hive.llap.ConsumerFeedback; -import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; @@ -31,12 +31,8 @@ import org.apache.hadoop.hive.llap.io.encoded.TezCounterSource; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; -import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace; import org.apache.hive.common.util.FixedSizedObjectPool; -import org.apache.orc.TypeDescription; -import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.runtime.task.TaskRunner2Callable; public abstract class EncodedDataConsumer> implements Consumer, ReadPipeline { @@ -47,26 +43,29 @@ private final LlapDaemonIOMetrics ioMetrics; // Note that the pool is per EDC - within EDC, CVBs are expected to have the same schema. private static final int CVB_POOL_SIZE = 128; - protected final FixedSizedObjectPool cvbPool; + protected final FixedSizedObjectPool> cvbPool; protected final QueryFragmentCounters counters; private final ThreadMXBean mxBean; + protected final int colCount; public EncodedDataConsumer(Consumer consumer, final int colCount, LlapDaemonIOMetrics ioMetrics, QueryFragmentCounters counters) { this.downstreamConsumer = consumer; this.ioMetrics = ioMetrics; this.mxBean = LlapUtil.initThreadMxBean(); - cvbPool = new FixedSizedObjectPool(CVB_POOL_SIZE, - new Pool.PoolObjectHelper() { - @Override - public ColumnVectorBatch create() { - return new ColumnVectorBatch(colCount); - } - @Override - public void resetBeforeOffer(ColumnVectorBatch t) { - // Don't reset anything, we are reusing column vectors. - } - }); + this.colCount = colCount; + cvbPool = new FixedSizedObjectPool<>(CVB_POOL_SIZE, + new Pool.PoolObjectHelper>() { + @Override + public WeakReference create() { + return new WeakReference<>(new ColumnVectorBatch(colCount)); + } + + @Override + public void resetBeforeOffer(final WeakReference columnVectorBatchWeakReference) { + // Don't reset anything, we are reusing column vectors. + } + }); this.counters = counters; } @@ -153,7 +152,7 @@ public void setError(Throwable t) throws InterruptedException { @Override public void returnData(ColumnVectorBatch data) { - cvbPool.offer(data); + cvbPool.offer(new WeakReference<>(data)); } @Override diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 83931c27b1..04c7de6219 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -149,7 +149,11 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, if (batchSize == 0) break; } - ColumnVectorBatch cvb = cvbPool.take(); + ColumnVectorBatch cvb = cvbPool.take().get(); + // weak ref might have been GC'ed. Create a strong ref which will get discard after processing. + if (cvb == null) { + cvb = new ColumnVectorBatch(colCount); + } // assert cvb.cols.length == batch.getColumnIxs().length; // Must be constant per split. cvb.size = batchSize; for (int idx = 0; idx < columnReaders.length; ++idx) {