diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java index f2d2832afaf5f4d5a885abc5b7a22473b7a7b033..84436bc49520849dddafc922f310ec8d40743da5 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.io.encoded.TezCounterSource; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace; import org.apache.hive.common.util.FixedSizedObjectPool; @@ -153,6 +154,12 @@ public void setError(Throwable t) throws InterruptedException { @Override public void returnData(ColumnVectorBatch data) { + //In case a writer has a lock on any of the vectors we don't return it to the pool. + for (ColumnVector cv : data.cols) { + if (cv != null && cv.getRef() > 0) { + return; + } + } cvbPool.offer(data); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java index ca6d696ea1f71dece19fca415673be9321669765..c100d6ec3be3b5785a634c201c4475e56836ee98 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java @@ -369,11 +369,17 @@ private void propagateSourceBatchFieldsToDest() { destinationBatch.endOfFile = sourceBatch.endOfFile; } - private void addBatchToWriter() throws IOException { + void addBatchToWriter() throws IOException { propagateSourceBatchFieldsToDest(); if (!isAsync) { orcWriter.addRowBatch(destinationBatch); } else { + //Lock ColumnVectors so we don't accidentally reset them before they're written out + for (ColumnVector cv : destinationBatch.cols) { + if (cv != null) { + cv.incRef(); + } + } currentBatches.add(destinationBatch); addWriteOp(new VrbOperation(destinationBatch)); } @@ -431,7 +437,7 @@ public void close() throws IOException { return result; } - private static interface WriteOperation { + interface WriteOperation { boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException; } @@ -447,6 +453,11 @@ public VrbOperation(VectorizedRowBatch batch) { public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException { // LlapIoImpl.LOG.debug("Writing batch " + batch); writer.addRowBatch(batch); + for (ColumnVector cv : batch.cols) { + if (cv != null) { + assert (cv.decRef() == 0); + } + } return false; } } diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/io/encoded/TestVectorDeserializeOrcWriter.java b/llap-server/src/test/org/apache/hadoop/hive/llap/io/encoded/TestVectorDeserializeOrcWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..ef7b1a365116cd04f13c8c5dc92085e50cf7156b --- /dev/null +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/io/encoded/TestVectorDeserializeOrcWriter.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.io.encoded; + +import java.util.ArrayList; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch; +import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; +import org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.orc.WriterImpl; +import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; +import org.apache.hive.common.util.FixedSizedObjectPool; +import org.apache.orc.impl.SchemaEvolution; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.withSettings; +import static org.mockito.internal.util.reflection.Whitebox.getInternalState; +import static org.mockito.internal.util.reflection.Whitebox.setInternalState; + +/** + * Unit tests for VectorDeserializeOrcWriter. + */ +public class TestVectorDeserializeOrcWriter { + + private static final int TEST_NUM_COLS = 2; + + @Test + public void testConcurrencyIssueWhileWriting() throws Exception { + + //Setup//////////////////////////////////////////////////////////////////////////////////////// + EncodedDataConsumer consumer = createBlankEncodedDataConsumer(); + FixedSizedObjectPool cvbPool = (FixedSizedObjectPool) + getInternalState(consumer, "cvbPool"); + + ColumnVectorBatch cvb = new ColumnVectorBatch(TEST_NUM_COLS); + VectorizedRowBatch vrb = new VectorizedRowBatch(TEST_NUM_COLS); + createTestVectors(cvb, vrb); + + Queue writeOpQueue = new ConcurrentLinkedQueue<>(); + VectorDeserializeOrcWriter orcWriter = createOrcWriter(writeOpQueue, vrb); + + + //Simulating unfortunate order of events/////////////////////////////////////////////////////// + //Add CVs to writer -> should increase their refcount + //Happens when IO thread has generated a vector batch and hands it over to async ORC thread + orcWriter.addBatchToWriter(); + + //Return CVs to pool -> should check their refcount, and as they're 1, this should be a no-op + //Happens when LLAPRecordReader on Tez thread received and used the batch and now wants to + // return it for CVB recycling + consumer.returnData(cvb); + + //Do the write -> should decrease the refcount of CVs + //Happens when ORC thread gets to writing and hands the vectors of the batch over to ORC + // WriterImpl for encoding and cache storage + writeOpQueue.poll().apply(mock(WriterImpl.class), null); + + + //Verifications//////////////////////////////////////////////////////////////////////////////// + //Pool should be empty as the CVB return should have been a no-op, so this call should create a + // NEW instance of CVBs + ColumnVectorBatch newCvb = cvbPool.take(); + assertNotEquals(newCvb, cvb); + + //Simulating a 'clean' CVB return -> the CVB now does have to make its way back to the pool + consumer.returnData(cvb); + newCvb = cvbPool.take(); + assertEquals(newCvb, cvb); + } + + private static void createTestVectors(ColumnVectorBatch cvb, VectorizedRowBatch vrb) { + for (int i = 0; i < TEST_NUM_COLS; ++i) { + LongColumnVector cv = new LongColumnVector(); + cv.fill(i); + cvb.cols[i] = cv; + vrb.cols[i] = cv; + } + } + + private static VectorDeserializeOrcWriter createOrcWriter( + Queue writeOpQueue, VectorizedRowBatch vrb) { + VectorDeserializeOrcWriter orcWriter = mock(VectorDeserializeOrcWriter.class, + withSettings().defaultAnswer(CALLS_REAL_METHODS)); + setInternalState(orcWriter, "sourceBatch", vrb); + setInternalState(orcWriter, "destinationBatch", vrb); + setInternalState(orcWriter, "currentBatches", new ArrayList()); + setInternalState(orcWriter, "queue", writeOpQueue); + setInternalState(orcWriter, "isAsync", true); + return orcWriter; + } + + private static EncodedDataConsumer createBlankEncodedDataConsumer() { + return new EncodedDataConsumer(null, 1, null, null) { + @Override + protected void decodeBatch(EncodedColumnBatch batch, Consumer downstreamConsumer) + throws InterruptedException { + } + + @Override + public SchemaEvolution getSchemaEvolution() { + return null; + } + + @Override + public void consumeData(Object data) throws InterruptedException { + } + }; + } + +} diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java index f2ad6d2fb013096532adca2438db9ea5c8533b94..9f611dfd313be242e461d3cc6ec923fefce680f1 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java +++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; /** * ColumnVector contains the shared structure for the sub-types, @@ -32,6 +33,10 @@ */ public abstract class ColumnVector { + + /** Reference count. */ + private AtomicInteger refCount = new AtomicInteger(0); + /** * The current kinds of column vectors. */ @@ -95,6 +100,7 @@ public ColumnVector(Type type, int len) { * - sets isRepeating to false */ public void reset() { + assert (refCount.get() == 0); if (!noNulls) { Arrays.fill(isNull, false); } @@ -104,6 +110,21 @@ public void reset() { preFlattenIsRepeating = false; } + + public final void incRef() { + refCount.incrementAndGet(); + } + + public final int getRef() { + return refCount.get(); + } + + public final int decRef() { + int i = refCount.decrementAndGet(); + assert i >= 0; + return i; + } + /** * Sets the isRepeating flag. Recurses over structs and unions so that the * flags are set correctly. @@ -258,5 +279,6 @@ public void shallowCopyTo(ColumnVector otherCv) { otherCv.isRepeating = isRepeating; otherCv.preFlattenIsRepeating = preFlattenIsRepeating; otherCv.preFlattenNoNulls = preFlattenNoNulls; + otherCv.refCount = refCount; } }