diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java index f2d2832afaf5f4d5a885abc5b7a22473b7a7b033..e77ffc3521554519bec97f13dc6a47d74c055a7b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.io.encoded.TezCounterSource; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace; import org.apache.hive.common.util.FixedSizedObjectPool; @@ -153,6 +155,12 @@ public void setError(Throwable t) throws InterruptedException { @Override public void returnData(ColumnVectorBatch data) { + //In case a writer has a lock on any of the vectors we don't return it to the pool. + for (ColumnVector cv : data.cols) { + if (cv.getRef() > 0) { + return; + } + } cvbPool.offer(data); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java index ca6d696ea1f71dece19fca415673be9321669765..7eb349b6ea4e32523b59d7cbcdc3f0abed6dc5ea 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java @@ -374,6 +374,10 @@ private void addBatchToWriter() throws IOException { if (!isAsync) { orcWriter.addRowBatch(destinationBatch); } else { + //Lock ColumnVectors so we don't accidentally reset them before they're written out + for (ColumnVector cv : destinationBatch.cols) { + cv.incRef(); + } currentBatches.add(destinationBatch); addWriteOp(new VrbOperation(destinationBatch)); } @@ -447,6 +451,9 @@ public VrbOperation(VectorizedRowBatch batch) { public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException { // LlapIoImpl.LOG.debug("Writing batch " + batch); writer.addRowBatch(batch); + for (ColumnVector cv : batch.cols) { + assert (cv.decRef() == 0); + } return false; } } diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java index f2ad6d2fb013096532adca2438db9ea5c8533b94..a6c80d316ea3de350305235f9c502bf3c88801dd 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java +++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; /** * ColumnVector contains the shared structure for the sub-types, @@ -32,6 +33,10 @@ */ public abstract class ColumnVector { + + /** Reference count. */ + private final AtomicInteger refCount = new AtomicInteger(0); + /** * The current kinds of column vectors. */ @@ -95,6 +100,7 @@ public ColumnVector(Type type, int len) { * - sets isRepeating to false */ public void reset() { + assert (refCount.get() == 0); if (!noNulls) { Arrays.fill(isNull, false); } @@ -104,6 +110,21 @@ public void reset() { preFlattenIsRepeating = false; } + + public final void incRef() { + refCount.incrementAndGet(); + } + + public final int getRef() { + return refCount.get(); + } + + public final int decRef() { + int i = refCount.decrementAndGet(); + assert i >= 0; + return i; + } + /** * Sets the isRepeating flag. Recurses over structs and unions so that the * flags are set correctly. @@ -258,5 +279,6 @@ public void shallowCopyTo(ColumnVector otherCv) { otherCv.isRepeating = isRepeating; otherCv.preFlattenIsRepeating = preFlattenIsRepeating; otherCv.preFlattenNoNulls = preFlattenNoNulls; + otherCv.refCount.set(refCount.get()); } }