diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 398698ec06..c5f1550ca8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -107,7 +107,6 @@ final static long DELTA_STRIPE_SIZE = 16 * 1024 * 1024; private static final Charset UTF8 = Charset.forName("UTF-8"); - private static final CharsetDecoder utf8Decoder = UTF8.newDecoder(); private final AcidOutputFormat.Options options; private final AcidUtils.AcidOperationalProperties acidOperationalProperties; @@ -649,6 +648,7 @@ public long getBufferedRowCount() { ByteBuffer val = reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME) .duplicate(); + CharsetDecoder utf8Decoder = UTF8.newDecoder(); stripes = utf8Decoder.decode(val).toString().split(";"); } catch (CharacterCodingException e) { throw new IllegalArgumentException("Bad string encoding for " + diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java index ef6dbbbd52..06a2d8d6e5 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java @@ -21,11 +21,21 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.File; import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -314,4 +324,44 @@ public void testUpdates() throws Exception { assertEquals(false, rows.hasNext()); } + + /* + CharsetDecoder instances are not thread safe, so it can end up in an inconsistent state when reading multiple + buffers parallel. + E.g: + java.lang.IllegalStateException: Current state = FLUSHED, new state = CODING_END + */ + @Test + public void testConcurrentParseKeyIndex() throws Exception { + + // Given + Reader mockReader = mock(Reader.class); + when(mockReader.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)).thenReturn(true); + + // Create a large buffer + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 3000; i++) { + sb.append("100000,200000,300000;"); + } + when(mockReader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)).thenReturn( + ByteBuffer.wrap(sb.toString().getBytes())); + + // When + // Hit OrcRecordUpdater.parseKeyIndex with large parallelism + final int parallelism = 4000; + Callable[] r = new Callable[parallelism]; + for (int i = 0; i < parallelism; i++) { + r[i] = () -> { + return OrcRecordUpdater.parseKeyIndex(mockReader); + }; + } + ExecutorService executorService = Executors.newFixedThreadPool(parallelism); + List> res = executorService.invokeAll(Arrays.asList(r)); + + // Then + // Check for exceptions + for (Future ri : res) { + ri.get(); + } + } }