diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index 6d06e9e..851ea1b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -583,15 +583,15 @@ private String toErrorMessage(Writable value, Object row, ObjectInspector inspec } } else if(vc.equals(VirtualColumn.ROWID)) { - if(ctx.getIoCxt().ri == null) { + if(ctx.getIoCxt().getRecordIdentifier() == null) { vcValues[i] = null; } else { if(vcValues[i] == null) { vcValues[i] = new Object[RecordIdentifier.Field.values().length]; } - RecordIdentifier.StructInfo.toArray(ctx.getIoCxt().ri, (Object[])vcValues[i]); - ctx.getIoCxt().ri = null;//so we don't accidentally cache the value; shouldn't + RecordIdentifier.StructInfo.toArray(ctx.getIoCxt().getRecordIdentifier(), (Object[])vcValues[i]); + ctx.getIoCxt().setRecordIdentifier(null);//so we don't accidentally cache the value; shouldn't //happen since IO layer either knows how to produce ROW__ID or not - but to be safe } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index 96d7b1e..30fe7fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -116,11 +116,11 @@ public boolean next(K key, V value) throws IOException { if(retVal) { if(key instanceof RecordIdentifier) { //supports AcidInputFormat which uses the KEY pass ROW__ID info - ioCxtRef.ri = (RecordIdentifier)key; + ioCxtRef.setRecordIdentifier((RecordIdentifier)key); } else if(recordReader instanceof AcidInputFormat.AcidRecordReader) { //supports AcidInputFormat which do not use the KEY pass ROW__ID info - ioCxtRef.ri = ((AcidInputFormat.AcidRecordReader) recordReader).getRecordIdentifier(); + ioCxtRef.setRecordIdentifier(((AcidInputFormat.AcidRecordReader) recordReader).getRecordIdentifier()); } } return retVal; @@ -133,30 +133,30 @@ else if(recordReader instanceof AcidInputFormat.AcidRecordReader) { protected void updateIOContext() throws IOException { long pointerPos = this.getPos(); - if (!ioCxtRef.isBlockPointer) { - ioCxtRef.currentBlockStart = pointerPos; - ioCxtRef.currentRow = 0; + if (!ioCxtRef.isBlockPointer()) { + ioCxtRef.setCurrentBlockStart(pointerPos); + ioCxtRef.setCurrentRow(0); return; } - ioCxtRef.currentRow++; + ioCxtRef.setCurrentRow(ioCxtRef.getCurrentRow() + 1); - if (ioCxtRef.nextBlockStart == -1) { - ioCxtRef.nextBlockStart = pointerPos; - ioCxtRef.currentRow = 0; + if (ioCxtRef.getNextBlockStart() == -1) { + ioCxtRef.setNextBlockStart(pointerPos); + ioCxtRef.setCurrentRow(0); } - if (pointerPos != ioCxtRef.nextBlockStart) { + if (pointerPos != ioCxtRef.getNextBlockStart()) { // the reader pointer has moved to the end of next block, or the end of // current record. - ioCxtRef.currentRow = 0; + ioCxtRef.setCurrentRow(0); - if (ioCxtRef.currentBlockStart == ioCxtRef.nextBlockStart) { - ioCxtRef.currentRow = 1; + if (ioCxtRef.getCurrentBlockStart() == ioCxtRef.getNextBlockStart()) { + ioCxtRef.setCurrentRow(1); } - ioCxtRef.currentBlockStart = ioCxtRef.nextBlockStart; - ioCxtRef.nextBlockStart = pointerPos; + ioCxtRef.setCurrentBlockStart(ioCxtRef.getNextBlockStart()); + ioCxtRef.setNextBlockStart(pointerPos); } } @@ -167,9 +167,9 @@ public IOContext getIOContext() { private void initIOContext(long startPos, boolean isBlockPointer, Path inputPath) { ioCxtRef = this.getIOContext(); - ioCxtRef.currentBlockStart = startPos; - ioCxtRef.isBlockPointer = isBlockPointer; - ioCxtRef.inputPath = inputPath; + ioCxtRef.setCurrentBlockStart(startPos); + ioCxtRef.setBlockPointer(isBlockPointer); + ioCxtRef.setInputPath(inputPath); LOG.info("Processing file " + inputPath); initDone = true; } @@ -222,7 +222,7 @@ public void initIOContextSortedProps(FileSplit split, RecordReader recordReader, // Binary search only works if we know the size of the split, and the recordReader is an // RCFileRecordReader this.getIOContext().setUseSorted(true); - this.getIOContext().setIsBinarySearching(true); + this.getIOContext().setBinarySearching(true); this.wasUsingSortedSearch = true; } else { // Use the defalut methods for next in the child class @@ -284,7 +284,7 @@ public boolean doNext(K key, V value) throws IOException { // binary search, if the new position at least as big as the size of the split, any // matching rows must be in the final block, so we can end the binary search. if (newPosition == previousPosition || newPosition >= splitEnd) { - this.getIOContext().setIsBinarySearching(false); + this.getIOContext().setBinarySearching(false); sync(rangeStart); } @@ -402,7 +402,7 @@ private void setGenericUDFClassName(String genericUDFClassName) throws IOExcepti */ private void beginLinearSearch() throws IOException { sync(rangeStart); - this.getIOContext().setIsBinarySearching(false); + this.getIOContext().setBinarySearching(false); this.wasUsingSortedSearch = false; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java index d42f568..ac47975 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java @@ -34,18 +34,21 @@ */ public class IOContext { - private static ThreadLocal threadLocal = new ThreadLocal(){ + /** + * Spark uses this thread local + */ + private static final ThreadLocal threadLocal = new ThreadLocal(){ @Override protected synchronized IOContext initialValue() { return new IOContext(); } }; - private static Map inputNameIOContextMap = new HashMap(); - public static Map getMap() { - return inputNameIOContextMap; - } + /** + * Tez and MR use this map but are single threaded per JVM thus no synchronization is required. + */ + private static final Map inputNameIOContextMap = new HashMap(); public static IOContext get(String inputName) { - if (inputNameIOContextMap.containsKey(inputName) == false) { + if (!inputNameIOContextMap.containsKey(inputName)) { IOContext ioContext = new IOContext(); inputNameIOContextMap.put(inputName, ioContext); } @@ -58,26 +61,26 @@ public static void clear() { inputNameIOContextMap.clear(); } - long currentBlockStart; - long nextBlockStart; - long currentRow; - boolean isBlockPointer; - boolean ioExceptions; + private long currentBlockStart; + private long nextBlockStart; + private long currentRow; + private boolean isBlockPointer; + private boolean ioExceptions; // Are we using the fact the input is sorted - boolean useSorted = false; + private boolean useSorted = false; // Are we currently performing a binary search - boolean isBinarySearching = false; + private boolean isBinarySearching = false; // Do we want to end the binary search - boolean endBinarySearch = false; + private boolean endBinarySearch = false; // The result of the comparison of the last row processed - Comparison comparison = null; + private Comparison comparison = null; // The class name of the generic UDF being used by the filter - String genericUDFClassName = null; + private String genericUDFClassName = null; /** * supports {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#ROWID} */ - public RecordIdentifier ri; + private RecordIdentifier ri; public static enum Comparison { GREATER, @@ -86,7 +89,7 @@ public static void clear() { UNKNOWN } - Path inputPath; + private Path inputPath; public IOContext() { this.currentBlockStart = 0; @@ -156,7 +159,7 @@ public boolean isBinarySearching() { return isBinarySearching; } - public void setIsBinarySearching(boolean isBinarySearching) { + public void setBinarySearching(boolean isBinarySearching) { this.isBinarySearching = isBinarySearching; } @@ -197,6 +200,14 @@ public void setGenericUDFClassName(String genericUDFClassName) { this.genericUDFClassName = genericUDFClassName; } + public RecordIdentifier getRecordIdentifier() { + return this.ri; + } + + public void setRecordIdentifier(RecordIdentifier ri) { + this.ri = ri; + } + /** * The thread local IOContext is static, we may need to restart the search if, for instance, * multiple files are being searched as part of a CombinedHiveRecordReader diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java index 6a10827..f36f8e0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java @@ -118,7 +118,7 @@ private void resetIOContext() { conf.set(Utilities.INPUT_NAME, "TestHiveBinarySearchRecordReader"); ioContext = IOContext.get(conf.get(Utilities.INPUT_NAME)); ioContext.setUseSorted(false); - ioContext.setIsBinarySearching(false); + ioContext.setBinarySearching(false); ioContext.setEndBinarySearch(false); ioContext.setComparison(null); ioContext.setGenericUDFClassName(null); @@ -252,7 +252,7 @@ public void testEqualOpClass() throws Exception { ioContext.setGenericUDFClassName(GenericUDFOPEqual.class.getName()); Assert.assertTrue(ioContext.isBinarySearching()); Assert.assertTrue(executeDoNext(hbsReader)); - ioContext.setIsBinarySearching(false); + ioContext.setBinarySearching(false); ioContext.setComparison(-1); Assert.assertTrue(executeDoNext(hbsReader)); ioContext.setComparison(0); @@ -292,7 +292,7 @@ public void testGreaterThanOpClass() throws Exception { ioContext.setGenericUDFClassName(GenericUDFOPGreaterThan.class.getName()); Assert.assertTrue(ioContext.isBinarySearching()); Assert.assertTrue(executeDoNext(hbsReader)); - ioContext.setIsBinarySearching(false); + ioContext.setBinarySearching(false); ioContext.setComparison(-1); Assert.assertTrue(executeDoNext(hbsReader)); ioContext.setComparison(0); @@ -306,7 +306,7 @@ public void testGreaterThanOrEqualOpClass() throws Exception { ioContext.setGenericUDFClassName(GenericUDFOPEqualOrGreaterThan.class.getName()); Assert.assertTrue(ioContext.isBinarySearching()); Assert.assertTrue(executeDoNext(hbsReader)); - ioContext.setIsBinarySearching(false); + ioContext.setBinarySearching(false); ioContext.setComparison(-1); Assert.assertTrue(executeDoNext(hbsReader)); ioContext.setComparison(0);