diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java b/orc/src/java/org/apache/orc/impl/IntegerReader.java index b928559..7dfd289 100644 --- a/orc/src/java/org/apache/orc/impl/IntegerReader.java +++ b/orc/src/java/org/apache/orc/impl/IntegerReader.java @@ -60,8 +60,6 @@ * @return * @throws IOException */ - void nextVector(LongColumnVector previous, long previousLen) + void nextVector(LongColumnVector previous, final int previousLen) throws IOException; - - void setInStream(InStream data); } diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java index f129c86..0c90cde 100644 --- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java +++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java @@ -99,7 +99,7 @@ public long next() throws IOException { } @Override - public void nextVector(LongColumnVector previous, long previousLen) throws IOException { + public void nextVector(LongColumnVector previous, final int previousLen) throws IOException { previous.isRepeating = true; for (int i = 0; i < previousLen; i++) { if (!previous.isNull[i]) { @@ -122,11 +122,6 @@ public void nextVector(LongColumnVector previous, long previousLen) throws IOExc } @Override - public void setInStream(InStream data) { - input = data; - } - - @Override public void seek(PositionProvider index) throws IOException { input.seek(index); int consumed = (int) index.getNext(); diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java index 5f2a673..c6d685a 100644 --- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java +++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java @@ -360,7 +360,7 @@ public void skip(long numValues) throws IOException { } @Override - public void nextVector(LongColumnVector previous, long previousLen) throws IOException { + public void nextVector(LongColumnVector previous, final int previousLen) throws IOException { previous.isRepeating = true; for (int i = 0; i < previousLen; i++) { if (!previous.isNull[i]) { @@ -382,9 +382,4 @@ public void nextVector(LongColumnVector previous, long previousLen) throws IOExc } } } - - @Override - public void setInStream(InStream data) { - input = data; - } } diff --git a/orc/src/java/org/apache/orc/impl/SerializationUtils.java b/orc/src/java/org/apache/orc/impl/SerializationUtils.java index c1162e4..78c28e6 100644 --- a/orc/src/java/org/apache/orc/impl/SerializationUtils.java +++ b/orc/src/java/org/apache/orc/impl/SerializationUtils.java @@ -18,8 +18,6 @@ package org.apache.orc.impl; -import org.apache.orc.impl.InStream; - import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -77,18 +75,22 @@ public long readVslong(InputStream in) throws IOException { } public float readFloat(InputStream in) throws IOException { - int ser = in.read() | (in.read() << 8) | (in.read() << 16) | - (in.read() << 24); - return Float.intBitsToFloat(ser); + in.read(readBuffer, 0, 4); + int val = (((readBuffer[0] & 0xff) << 0) + + ((readBuffer[1] & 0xff) << 8) + + ((readBuffer[2] & 0xff) << 16) + + ((readBuffer[3] & 0xff) << 24)); + return Float.intBitsToFloat(val); } public void writeFloat(OutputStream output, float value) throws IOException { int ser = Float.floatToIntBits(value); - output.write(ser & 0xff); - output.write((ser >> 8) & 0xff); - output.write((ser >> 16) & 0xff); - output.write((ser >> 24) & 0xff); + writeBuffer[0] = (byte) ((ser >> 0) & 0xff); + writeBuffer[1] = (byte) ((ser >> 8) & 0xff); + writeBuffer[2] = (byte) ((ser >> 16) & 0xff); + writeBuffer[3] = (byte) ((ser >> 24) & 0xff); + output.write(writeBuffer, 0, 4); } public double readDouble(InputStream in) throws IOException { diff --git a/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java b/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java index 0785412..4a8a0f2 100644 --- a/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java +++ b/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java @@ -25,6 +25,9 @@ import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; import org.junit.Test; @@ -156,9 +159,43 @@ public void testSubtractionOverflowGuava() { assertEquals(Long.MIN_VALUE, LongMath.checkedSubtract(Long.MIN_VALUE, 0)); } - public static void main(String[] args) throws Exception { - TestSerializationUtils test = new TestSerializationUtils(); - test.testDoubles(); - test.testBigIntegers(); + @Test + public void testRandomFloats() throws Exception { + float tolerance = 0.0000000000000001f; + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + SerializationUtils utils = new SerializationUtils(); + Random rand = new Random(); + int n = 100_000; + float[] expected = new float[n]; + for (int i = 0; i < n; i++) { + float f = rand.nextFloat(); + expected[i] = f; + utils.writeFloat(buffer, f); + } + InputStream newBuffer = fromBuffer(buffer); + for (int i = 0; i < n; i++) { + float got = utils.readFloat(newBuffer); + assertEquals(expected[i], got, tolerance); + } + } + + @Test + public void testRandomDoubles() throws Exception { + double tolerance = 0.0000000000000001; + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + SerializationUtils utils = new SerializationUtils(); + Random rand = new Random(); + int n = 100_000; + double[] expected = new double[n]; + for (int i = 0; i < n; i++) { + double d = rand.nextDouble(); + expected[i] = d; + utils.writeDouble(buffer, d); + } + InputStream newBuffer = fromBuffer(buffer); + for (int i = 0; i < n; i++) { + double got = utils.readDouble(newBuffer); + assertEquals(expected[i], got, tolerance); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index aa835ae..3975409 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -1060,7 +1060,7 @@ public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOExcept readStripe(); } - long batchSize = computeBatchSize(VectorizedRowBatch.DEFAULT_SIZE); + final int batchSize = computeBatchSize(VectorizedRowBatch.DEFAULT_SIZE); rowInStripe += batchSize; if (previous == null) { @@ -1068,13 +1068,13 @@ public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOExcept result = new VectorizedRowBatch(cols.length); result.cols = cols; } else { - result = (VectorizedRowBatch) previous; + result = previous; result.selectedInUse = false; reader.setVectorColumnCount(result.getDataColumnCount()); - reader.nextVector(result.cols, (int) batchSize); + reader.nextVector(result.cols, batchSize); } - result.size = (int) batchSize; + result.size = batchSize; advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true); return result; } catch (IOException e) { @@ -1083,8 +1083,8 @@ public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOExcept } } - private long computeBatchSize(long targetBatchSize) { - long batchSize = 0; + private int computeBatchSize(long targetBatchSize) { + final int batchSize; // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row // groups are selected then marker position is set to the end of range (subset of row groups // within strip). Batch size computed out of marker position makes sure that batch size is @@ -1106,13 +1106,13 @@ private long computeBatchSize(long targetBatchSize) { final long markerPosition = (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride) : rowCountInStripe; - batchSize = Math.min(targetBatchSize, (markerPosition - rowInStripe)); + batchSize = (int) Math.min(targetBatchSize, (markerPosition - rowInStripe)); if (isLogDebugEnabled && batchSize < targetBatchSize) { LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize); } } else { - batchSize = Math.min(targetBatchSize, (rowCountInStripe - rowInStripe)); + batchSize = (int) Math.min(targetBatchSize, (rowCountInStripe - rowInStripe)); } return batchSize; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java index 620ad53..d74a854 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java @@ -239,7 +239,7 @@ Object next(Object previous) throws IOException { * @return next column vector * @throws IOException */ - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { ColumnVector result = (ColumnVector) previousVector; if (present != null) { // Set noNulls and isNull vector of the ColumnVector based on @@ -322,7 +322,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final LongColumnVector result; if (previousVector == null) { result = new LongColumnVector(); @@ -387,7 +387,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final LongColumnVector result; if (previousVector == null) { result = new LongColumnVector(); @@ -473,7 +473,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final LongColumnVector result; if (previousVector == null) { result = new LongColumnVector(); @@ -559,7 +559,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final LongColumnVector result; if (previousVector == null) { result = new LongColumnVector(); @@ -646,7 +646,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final LongColumnVector result; if (previousVector == null) { result = new LongColumnVector(); @@ -719,7 +719,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, final long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final DoubleColumnVector result; if (previousVector == null) { result = new DoubleColumnVector(); @@ -832,7 +832,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, final long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final DoubleColumnVector result; if (previousVector == null) { result = new DoubleColumnVector(); @@ -974,7 +974,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final BytesColumnVector result; if (previousVector == null) { result = new BytesColumnVector(); @@ -1144,7 +1144,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final TimestampColumnVector result; if (previousVector == null) { result = new TimestampColumnVector(); @@ -1253,7 +1253,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final LongColumnVector result; if (previousVector == null) { result = new LongColumnVector(); @@ -1352,7 +1352,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final DecimalColumnVector result; if (previousVector == null) { result = new DecimalColumnVector(precision, scale); @@ -1481,7 +1481,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { return reader.nextVector(previousVector, batchSize); } @@ -1498,7 +1498,7 @@ void skipRows(long items) throws IOException { private static byte[] commonReadByteArrays(InStream stream, IntegerReader lengths, LongColumnVector scratchlcv, - BytesColumnVector result, long batchSize) throws IOException { + BytesColumnVector result, final int batchSize) throws IOException { // Read lengths scratchlcv.isNull = result.isNull; // Notice we are replacing the isNull vector here... lengths.nextVector(scratchlcv, batchSize); @@ -1534,7 +1534,7 @@ void skipRows(long items) throws IOException { // This method has the common code for reading in bytes into a BytesColumnVector. public static void readOrcByteArrays(InStream stream, IntegerReader lengths, LongColumnVector scratchlcv, - BytesColumnVector result, long batchSize) throws IOException { + BytesColumnVector result, final int batchSize) throws IOException { byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv, result, batchSize); @@ -1641,7 +1641,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final BytesColumnVector result; if (previousVector == null) { result = new BytesColumnVector(); @@ -1815,7 +1815,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final BytesColumnVector result; int offset; int length; @@ -1926,7 +1926,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { // Get the vector of strings from StringTreeReader, then make a 2nd pass to // adjust down the length (right trim and truncate) if necessary. BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize); @@ -2000,7 +2000,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { // Get the vector of strings from StringTreeReader, then make a 2nd pass to // adjust down the length (truncate) if necessary. BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize); @@ -2137,7 +2137,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final ColumnVector[] result; if (previousVector == null) { result = new ColumnVector[readColumnCount]; @@ -2242,7 +2242,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { throw new UnsupportedOperationException( "NextVector is not supported operation for Union type"); } @@ -2325,7 +2325,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previous, long batchSize) throws IOException { + public Object nextVector(Object previous, final int batchSize) throws IOException { throw new UnsupportedOperationException( "NextVector is not supported operation for List type"); } @@ -2419,7 +2419,7 @@ Object next(Object previous) throws IOException { } @Override - public Object nextVector(Object previous, long batchSize) throws IOException { + public Object nextVector(Object previous, final int batchSize) throws IOException { throw new UnsupportedOperationException( "NextVector is not supported operation for Map type"); }