diff --git ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java index 19a6d0e..55ac8c6 100644 --- ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java +++ ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java @@ -5695,10 +5695,14 @@ public ColumnEncoding getDefaultInstanceForType() { implements com.google.protobuf.ProtocolMessageEnum { DIRECT(0, 0), DICTIONARY(1, 1), + DIRECT_V2(2, 2), + DICTIONARY_V2(3, 3), ; public static final int DIRECT_VALUE = 0; public static final int DICTIONARY_VALUE = 1; + public static final int DIRECT_V2_VALUE = 2; + public static final int DICTIONARY_V2_VALUE = 3; public final int getNumber() { return value; } @@ -5707,6 +5711,8 @@ public static Kind valueOf(int value) { switch (value) { case 0: return DIRECT; case 1: return DICTIONARY; + case 2: return DIRECT_V2; + case 3: return DICTIONARY_V2; default: return null; } } @@ -5737,7 +5743,7 @@ public Kind findValueByNumber(int number) { } private static final Kind[] VALUES = { - DIRECT, DICTIONARY, + DIRECT, DICTIONARY, DIRECT_V2, DICTIONARY_V2, }; public static Kind valueOf( @@ -11117,42 +11123,42 @@ void setMagic(com.google.protobuf.ByteString value) { "eam.Kind\022\016\n\006column\030\002 \001(\r\022\016\n\006length\030\003 \001(\004", "\"r\n\004Kind\022\013\n\007PRESENT\020\000\022\010\n\004DATA\020\001\022\n\n\006LENGT" + "H\020\002\022\023\n\017DICTIONARY_DATA\020\003\022\024\n\020DICTIONARY_C" + - "OUNT\020\004\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\221\001\n" + + "OUNT\020\004\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\263\001\n" + "\016ColumnEncoding\022C\n\004kind\030\001 \002(\01625.org.apac" + "he.hadoop.hive.ql.io.orc.ColumnEncoding." + - "Kind\022\026\n\016dictionarySize\030\002 \001(\r\"\"\n\004Kind\022\n\n\006" + - "DIRECT\020\000\022\016\n\nDICTIONARY\020\001\"\214\001\n\014StripeFoote" + - "r\0229\n\007streams\030\001 \003(\0132(.org.apache.hadoop.h" + - "ive.ql.io.orc.Stream\022A\n\007columns\030\002 \003(\01320." + - "org.apache.hadoop.hive.ql.io.orc.ColumnE", - "ncoding\"\250\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apa" + - "che.hadoop.hive.ql.io.orc.Type.Kind\022\024\n\010s" + - "ubtypes\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\272\001" + - "\n\004Kind\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002" + - "\022\007\n\003INT\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE" + - "\020\006\022\n\n\006STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020" + - "\t\022\010\n\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNIO" + - "N\020\r\022\013\n\007DECIMAL\020\016\022\010\n\004DATE\020\017\"x\n\021StripeInfo" + - "rmation\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002" + - " \001(\004\022\022\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength", - "\030\004 \001(\004\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMeta" + - "dataItem\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002" + - "\n\006Footer\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rconten" + - "tLength\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apa" + - "che.hadoop.hive.ql.io.orc.StripeInformat" + - "ion\0225\n\005types\030\004 \003(\0132&.org.apache.hadoop.h" + - "ive.ql.io.orc.Type\022D\n\010metadata\030\005 \003(\01322.o" + - "rg.apache.hadoop.hive.ql.io.orc.UserMeta" + - "dataItem\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatis" + - "tics\030\007 \003(\01322.org.apache.hadoop.hive.ql.i", - "o.orc.ColumnStatistics\022\026\n\016rowIndexStride" + - "\030\010 \001(\r\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 " + - "\001(\004\022F\n\013compression\030\002 \001(\01621.org.apache.ha" + - "doop.hive.ql.io.orc.CompressionKind\022\034\n\024c" + - "ompressionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003" + - "(\rB\002\020\001\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKin" + - "d\022\010\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO" + - "\020\003" + "Kind\022\026\n\016dictionarySize\030\002 \001(\r\"D\n\004Kind\022\n\n\006" + + "DIRECT\020\000\022\016\n\nDICTIONARY\020\001\022\r\n\tDIRECT_V2\020\002\022" + + "\021\n\rDICTIONARY_V2\020\003\"\214\001\n\014StripeFooter\0229\n\007s" + + "treams\030\001 \003(\0132(.org.apache.hadoop.hive.ql" + + ".io.orc.Stream\022A\n\007columns\030\002 \003(\01320.org.ap", + "ache.hadoop.hive.ql.io.orc.ColumnEncodin" + + "g\"\250\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apache.ha" + + "doop.hive.ql.io.orc.Type.Kind\022\024\n\010subtype" + + "s\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\272\001\n\004Kind" + + "\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002\022\007\n\003IN" + + "T\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE\020\006\022\n\n\006" + + "STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020\t\022\010\n\004L" + + "IST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNION\020\r\022\013\n" + + "\007DECIMAL\020\016\022\010\n\004DATE\020\017\"x\n\021StripeInformatio" + + "n\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002 \001(\004\022\022", + "\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength\030\004 \001(\004" + + "\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMetadataIt" + + "em\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n\006Foot" + + "er\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rcontentLengt" + + "h\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apache.ha" + + "doop.hive.ql.io.orc.StripeInformation\0225\n" + + "\005types\030\004 \003(\0132&.org.apache.hadoop.hive.ql" + + ".io.orc.Type\022D\n\010metadata\030\005 \003(\01322.org.apa" + + "che.hadoop.hive.ql.io.orc.UserMetadataIt" + + "em\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatistics\030\007", + " \003(\01322.org.apache.hadoop.hive.ql.io.orc." + + "ColumnStatistics\022\026\n\016rowIndexStride\030\010 \001(\r" + + "\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 \001(\004\022F\n" + + "\013compression\030\002 \001(\01621.org.apache.hadoop.h" + + "ive.ql.io.orc.CompressionKind\022\034\n\024compres" + + "sionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003(\rB\002\020\001" + + "\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKind\022\010\n\004N" + + "ONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java new file mode 100644 index 0000000..a023f92 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; + +/** + * Interface for reading integers. + */ +interface IntegerReader { + + /** + * Seek to the position provided by index. + * @param index + * @throws IOException + */ + void seek(PositionProvider index) throws IOException; + + /** + * Skip number of specified rows. + * @param numValues + * @throws IOException + */ + void skip(long numValues) throws IOException; + + /** + * Check if there are any more values left. + * @return + * @throws IOException + */ + boolean hasNext() throws IOException; + + /** + * Return the next available value. + * @return + * @throws IOException + */ + long next() throws IOException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java new file mode 100644 index 0000000..17238e2 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; + +/** + * Interface for writing integers. + */ +interface IntegerWriter { + + /** + * Get position from the stream. + * @param recorder + * @throws IOException + */ + void getPosition(PositionRecorder recorder) throws IOException; + + /** + * Write the integer value + * @param value + * @throws IOException + */ + void write(long value) throws IOException; + + /** + * Flush the buffer + * @throws IOException + */ + void flush() throws IOException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 92413e1..e6e078a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -116,6 +116,7 @@ public long getNext() { protected final int columnId; private BitFieldReader present = null; protected boolean valuePresent = false; + protected boolean isDirectV2; TreeReader(Path path, int columnId) { this.path = path; @@ -123,7 +124,8 @@ public long getNext() { } void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { - if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { throw new IOException("Unknown encoding " + encoding + " in column " + columnId + " of " + path); } @@ -132,6 +134,10 @@ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { void startStripe(Map streams, List encoding ) throws IOException { + if(encoding.get(columnId).getKind().equals(OrcProto.ColumnEncoding.Kind.DIRECT_V2) + || encoding.get(columnId).getKind().equals(OrcProto.ColumnEncoding.Kind.DICTIONARY_V2)) { + this.isDirectV2 = true; + } checkEncoding(encoding.get(columnId)); InStream in = streams.get(new StreamName(columnId, OrcProto.Stream.Kind.PRESENT)); @@ -265,7 +271,7 @@ void skipRows(long items) throws IOException { } private static class ShortTreeReader extends TreeReader{ - private RunLengthIntegerReader reader = null; + private IntegerReader reader = null; ShortTreeReader(Path path, int columnId) { super(path, columnId); @@ -278,7 +284,11 @@ void startStripe(Map streams, super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + if (super.isDirectV2) { + reader = new RunLengthIntegerReaderV2(streams.get(name), true); + } else { + reader = new RunLengthIntegerReader(streams.get(name), true); + } } @Override @@ -309,7 +319,7 @@ void skipRows(long items) throws IOException { } private static class IntTreeReader extends TreeReader{ - private RunLengthIntegerReader reader = null; + private IntegerReader reader = null; IntTreeReader(Path path, int columnId) { super(path, columnId); @@ -322,7 +332,11 @@ void startStripe(Map streams, super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + if (isDirectV2) { + reader = new RunLengthIntegerReaderV2(streams.get(name), true); + } else { + reader = new RunLengthIntegerReader(streams.get(name), true); + } } @Override @@ -353,7 +367,7 @@ void skipRows(long items) throws IOException { } private static class LongTreeReader extends TreeReader{ - private RunLengthIntegerReader reader = null; + private IntegerReader reader = null; LongTreeReader(Path path, int columnId) { super(path, columnId); @@ -366,7 +380,11 @@ void startStripe(Map streams, super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + if (isDirectV2) { + reader = new RunLengthIntegerReaderV2(streams.get(name), true); + } else { + reader = new RunLengthIntegerReader(streams.get(name), true); + } } @Override @@ -491,7 +509,7 @@ void skipRows(long items) throws IOException { private static class BinaryTreeReader extends TreeReader{ private InStream stream; - private RunLengthIntegerReader lengths; + private IntegerReader lengths = null; BinaryTreeReader(Path path, int columnId) { super(path, columnId); @@ -505,9 +523,15 @@ void startStripe(Map streams, StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); stream = streams.get(name); - lengths = new RunLengthIntegerReader(streams.get(new - StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), - false); + if (isDirectV2) { + lengths = new RunLengthIntegerReaderV2(streams.get(new + StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), + false); + } else { + lengths = new RunLengthIntegerReader(streams.get(new + StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), + false); + } } @Override @@ -546,7 +570,7 @@ Object next(Object previous) throws IOException { void skipRows(long items) throws IOException { items = countNonNulls(items); long lengthToSkip = 0; - for(int i=0; i < items; ++i) { + for (int i = 0; i < items; ++i) { lengthToSkip += lengths.next(); } stream.skip(lengthToSkip); @@ -554,8 +578,8 @@ void skipRows(long items) throws IOException { } private static class TimestampTreeReader extends TreeReader{ - private RunLengthIntegerReader data; - private RunLengthIntegerReader nanos; + private IntegerReader data = null; + private IntegerReader nanos = null; TimestampTreeReader(Path path, int columnId) { super(path, columnId); @@ -566,10 +590,17 @@ void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - data = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.DATA)), true); - nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.SECONDARY)), false); + if (isDirectV2) { + data = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.DATA)), true); + nanos = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.SECONDARY)), false); + } else { + data = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.DATA)), true); + nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.SECONDARY)), false); + } } @Override @@ -589,9 +620,12 @@ Object next(Object previous) throws IOException { } else { result = (Timestamp) previous; } - long millis = (data.next() + WriterImpl.BASE_TIMESTAMP) * + long millis = 0; + int newNanos = 0; + millis = (data.next() + WriterImpl.BASE_TIMESTAMP) * WriterImpl.MILLIS_PER_SECOND; - int newNanos = parseNanos(nanos.next()); + newNanos = parseNanos(nanos.next()); + // fix the rounding when we divided by 1000. if (millis >= 0) { millis += newNanos / 1000000; @@ -669,7 +703,7 @@ void skipRows(long items) throws IOException { private static class DecimalTreeReader extends TreeReader{ private InStream valueStream; - private RunLengthIntegerReader scaleStream; + private IntegerReader scaleStream = null; DecimalTreeReader(Path path, int columnId) { super(path, columnId); @@ -682,8 +716,13 @@ void startStripe(Map streams, super.startStripe(streams, encodings); valueStream = streams.get(new StreamName(columnId, OrcProto.Stream.Kind.DATA)); - scaleStream = new RunLengthIntegerReader(streams.get( - new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true); + if (isDirectV2) { + scaleStream = new RunLengthIntegerReaderV2(streams.get( + new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true); + } else { + scaleStream = new RunLengthIntegerReader(streams.get( + new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true); + } } @Override @@ -717,14 +756,16 @@ void skipRows(long items) throws IOException { private DynamicByteArray dictionaryBuffer = null; private int dictionarySize; private int[] dictionaryOffsets; - private RunLengthIntegerReader reader; + private IntegerReader reader = null; StringTreeReader(Path path, int columnId) { super(path, columnId); } + @Override void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { - if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY) { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY_V2)) { throw new IOException("Unknown encoding " + encoding + " in column " + columnId + " of " + path); } @@ -752,13 +793,18 @@ void startStripe(Map streams, // read the lengths name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH); in = streams.get(name); - RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false); + IntegerReader lenReader = null; + if(isDirectV2) { + lenReader = new RunLengthIntegerReaderV2(in, false); + } else { + lenReader = new RunLengthIntegerReader(in, false); + } int offset = 0; if (dictionaryOffsets == null || dictionaryOffsets.length < dictionarySize + 1) { dictionaryOffsets = new int[dictionarySize + 1]; } - for(int i=0; i < dictionarySize; ++i) { + for (int i = 0; i < dictionarySize; ++i) { dictionaryOffsets[i] = offset; offset += (int) lenReader.next(); } @@ -767,7 +813,11 @@ void startStripe(Map streams, // set up the row reader name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), false); + if (isDirectV2) { + reader = new RunLengthIntegerReaderV2(streams.get(name), false); + } else { + reader = new RunLengthIntegerReader(streams.get(name), false); + } } @Override @@ -781,7 +831,8 @@ Object next(Object previous) throws IOException { super.next(previous); Text result = null; if (valuePresent) { - int entry = (int) reader.next(); + int entry = 0; + entry = (int) reader.next(); if (previous == null) { result = new Text(); } else { @@ -965,7 +1016,7 @@ void skipRows(long items) throws IOException { private static class ListTreeReader extends TreeReader { private final TreeReader elementReader; - private RunLengthIntegerReader lengths; + private IntegerReader lengths = null; ListTreeReader(Path path, int columnId, List types, @@ -1018,8 +1069,13 @@ void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.LENGTH)), false); + if (isDirectV2) { + lengths = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); + } else { + lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); + } if (elementReader != null) { elementReader.startStripe(streams, encodings); } @@ -1029,7 +1085,7 @@ void startStripe(Map streams, void skipRows(long items) throws IOException { items = countNonNulls(items); long childSkip = 0; - for(long i=0; i < items; ++i) { + for (long i = 0; i < items; ++i) { childSkip += lengths.next(); } elementReader.skipRows(childSkip); @@ -1039,7 +1095,7 @@ void skipRows(long items) throws IOException { private static class MapTreeReader extends TreeReader { private final TreeReader keyReader; private final TreeReader valueReader; - private RunLengthIntegerReader lengths; + private IntegerReader lengths = null; MapTreeReader(Path path, int columnId, @@ -1096,8 +1152,13 @@ void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.LENGTH)), false); + if (isDirectV2) { + lengths = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); + } else { + lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); + } if (keyReader != null) { keyReader.startStripe(streams, encodings); } @@ -1110,7 +1171,7 @@ void startStripe(Map streams, void skipRows(long items) throws IOException { items = countNonNulls(items); long childSkip = 0; - for(long i=0; i < items; ++i) { + for (long i = 0; i < items; ++i) { childSkip += lengths.next(); } keyReader.skipRows(childSkip); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java index 2825c64..9c825eb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java @@ -23,7 +23,7 @@ /** * A reader that reads a sequence of integers. * */ -class RunLengthIntegerReader { +class RunLengthIntegerReader implements IntegerReader { private final InStream input; private final boolean signed; private final long[] literals = @@ -71,11 +71,11 @@ private void readValues() throws IOException { } } - boolean hasNext() throws IOException { + public boolean hasNext() throws IOException { return used != numLiterals || input.available() > 0; } - long next() throws IOException { + public long next() throws IOException { long result; if (used == numLiterals) { readValues(); @@ -88,7 +88,7 @@ long next() throws IOException { return result; } - void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index) throws IOException { input.seek(index); int consumed = (int) index.getNext(); if (consumed != 0) { @@ -104,7 +104,7 @@ void seek(PositionProvider index) throws IOException { } } - void skip(long numValues) throws IOException { + public void skip(long numValues) throws IOException { while (numValues > 0) { if (used == numLiterals) { readValues(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java new file mode 100644 index 0000000..2cdc241 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.EOFException; +import java.io.IOException; + +import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType; + +/** + * A reader that reads a sequence of light weight compressed integers. Refer + * {@link RunLengthIntegerWriterV2} for description of various lightweight + * compression techniques. + */ +class RunLengthIntegerReaderV2 implements IntegerReader { + private final InStream input; + private final boolean signed; + private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE]; + private int numLiterals = 0; + private int used = 0; + + RunLengthIntegerReaderV2(InStream input, boolean signed) throws IOException { + this.input = input; + this.signed = signed; + } + + private void readValues() throws IOException { + // read the first 2 bits and determine the encoding type + int firstByte = input.read(); + if (firstByte < 0) { + throw new EOFException("Read past end of RLE integer from " + input); + } else { + int enc = (firstByte >>> 6) & 0x03; + if (EncodingType.SHORT_REPEAT.ordinal() == enc) { + readShortRepeatValues(firstByte); + } else if (EncodingType.DIRECT.ordinal() == enc) { + readDirectValues(firstByte); + } else if (EncodingType.PATCHED_BASE.ordinal() == enc) { + readPatchedBaseValues(firstByte); + } else { + readDeltaValues(firstByte); + } + } + } + + private void readDeltaValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fb = (firstByte >>> 1) & 0x1f; + if (fb != 0) { + fb = SerializationUtils.decodeBitWidth(fb); + } + + // extract the blob run length + int len = (firstByte & 0x01) << 8; + len |= input.read(); + + // read the first value stored as vint + long firstVal = 0; + if (signed) { + firstVal = SerializationUtils.readVslong(input); + } else { + firstVal = SerializationUtils.readVulong(input); + } + + // store first value to result buffer + long prevVal = firstVal; + literals[numLiterals++] = firstVal; + + // if fixed bits is 0 then all values have fixed delta + if (fb == 0) { + // read the fixed delta value stored as vint (deltas can be negative even + // if all number are positive) + long fd = SerializationUtils.readVslong(input); + + // add fixed deltas to adjacent values + for (int i = 0; i < len; i++) { + literals[numLiterals++] = literals[numLiterals - 2] + fd; + } + } else { + long deltaBase = SerializationUtils.readVslong(input); + // add delta base and first value + literals[numLiterals++] = firstVal + deltaBase; + prevVal = literals[numLiterals - 1]; + len -= 1; + + // write the unpacked values, add it to previous value and store final + // value to result buffer. if the delta base value is negative then it + // is a decreasing sequence else an increasing sequence + SerializationUtils.readInts(literals, numLiterals, len, fb, input); + while (len > 0) { + if (deltaBase < 0) { + literals[numLiterals] = prevVal - literals[numLiterals]; + } else { + literals[numLiterals] = prevVal + literals[numLiterals]; + } + prevVal = literals[numLiterals]; + len--; + numLiterals++; + } + } + } + + private void readPatchedBaseValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fbo = (firstByte >>> 1) & 0x1f; + int fb = SerializationUtils.decodeBitWidth(fbo); + + // extract the run length of data blob + int len = (firstByte & 0x01) << 8; + len |= input.read(); + // runs are always one off + len += 1; + + // extract the number of bytes occupied by base + int thirdByte = input.read(); + int bw = (thirdByte >>> 5) & 0x07; + // base width is one off + bw += 1; + + // extract patch width + int pwo = thirdByte & 0x1f; + int pw = SerializationUtils.decodeBitWidth(pwo); + + // read fourth byte and extract patch gap width + int fourthByte = input.read(); + int pgw = (fourthByte >>> 5) & 0x07; + // patch gap width is one off + pgw += 1; + + // extract the length of the patch list + int pl = fourthByte & 0x1f; + + // read the next base width number of bytes to extract base value + long base = SerializationUtils.bytesToLongBE(input, bw); + long mask = (1L << ((bw * 8) - 1)); + // if MSB of base value is 1 then base is negative value else positive + if ((base & mask) != 0) { + base = base & ~mask; + base = -base; + } + + // unpack the data blob + long[] unpacked = new long[len]; + SerializationUtils.readInts(unpacked, 0, len, fb, input); + + // unpack the patch blob + long[] unpackedPatch = new long[pl]; + SerializationUtils.readInts(unpackedPatch, 0, pl, pw + pgw, input); + + // apply the patch directly when decoding the packed data + int patchIdx = 0; + long currGap = 0; + long currPatch = 0; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + long actualGap = 0; + + // special case: gap is >255 then patch value will be 0. + // if gap is <=255 then patch value cannot be 0 + while (currGap == 255 && currPatch == 0) { + actualGap += 255; + patchIdx++; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + } + // add the left over gap + actualGap += currGap; + + // unpack data blob, patch it (if required), add base to get final result + for (int i = 0; i < unpacked.length; i++) { + if (i == actualGap) { + // apply patch + long patchedVal = unpacked[i] | (currPatch << fb); + + // add base to patched value and zigzag decode it + literals[numLiterals++] = base + patchedVal; + + // increment the patch to point to next entry in patch list + patchIdx++; + + if (patchIdx < pl) { + // read the next gap and patch + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + actualGap = 0; + + // special case: gap is >255 then patch will be 0. if gap is + // <=255 then patch cannot be 0 + while (currGap == 255 && currPatch == 0) { + actualGap += 255; + patchIdx++; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + } + // add the left over gap + actualGap += currGap; + + // next gap is relative to the current gap + actualGap += i; + } + } else { + // no patching required. add base to unpacked value to get final value + literals[numLiterals++] = base + unpacked[i]; + } + } + + } + + private void readDirectValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fbo = (firstByte >>> 1) & 0x1f; + int fb = SerializationUtils.decodeBitWidth(fbo); + + // extract the run length + int len = (firstByte & 0x01) << 8; + len |= input.read(); + // runs are one off + len += 1; + + // write the unpacked values and zigzag decode to result buffer + SerializationUtils.readInts(literals, numLiterals, len, fb, input); + if (signed) { + for (int i = 0; i < len; i++) { + literals[numLiterals] = SerializationUtils + .zigzagDecode(literals[numLiterals]); + numLiterals++; + } + } else { + numLiterals += len; + } + } + + private void readShortRepeatValues(int firstByte) throws IOException { + + // read the number of bytes occupied by the value + int size = (firstByte >>> 3) & 0x07; + // #bytes are one off + size += 1; + + // read the run length + int len = firstByte & 0x07; + // run lengths values are stored only after MIN_REPEAT value is met + len += RunLengthIntegerWriterV2.MIN_REPEAT; + + // read the repeated value which is store using fixed bytes + long val = SerializationUtils.bytesToLongBE(input, size); + + if (signed) { + val = SerializationUtils.zigzagDecode(val); + } + + // repeat the value for length times + for (int i = 0; i < len; i++) { + literals[numLiterals++] = val; + } + } + + public boolean hasNext() throws IOException { + return used != numLiterals || input.available() > 0; + } + + public long next() throws IOException { + long result; + if (used == numLiterals) { + numLiterals = 0; + used = 0; + readValues(); + } + result = literals[used++]; + return result; + } + + public void seek(PositionProvider index) throws IOException { + input.seek(index); + int consumed = (int) index.getNext(); + if (consumed != 0) { + // a loop is required for cases where we break the run into two + // parts + while (consumed > 0) { + numLiterals = 0; + readValues(); + used = consumed; + consumed -= numLiterals; + } + } else { + used = 0; + numLiterals = 0; + } + } + + public void skip(long numValues) throws IOException { + while (numValues > 0) { + if (used == numLiterals) { + numLiterals = 0; + used = 0; + readValues(); + } + long consume = Math.min(numValues, numLiterals - used); + used += consume; + numValues -= consume; + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java index aaca0a1..0497a56 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java @@ -25,7 +25,7 @@ * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128 * literal vint values follow. */ -class RunLengthIntegerWriter { +class RunLengthIntegerWriter implements IntegerWriter { static final int MIN_REPEAT_SIZE = 3; static final int MAX_DELTA = 127; static final int MIN_DELTA = -128; @@ -71,12 +71,12 @@ private void writeValues() throws IOException { } } - void flush() throws IOException { + public void flush() throws IOException { writeValues(); output.flush(); } - void write(long value) throws IOException { + public void write(long value) throws IOException { if (numLiterals == 0) { literals[numLiterals++] = value; tailRunLength = 1; @@ -130,8 +130,9 @@ void write(long value) throws IOException { } } - void getPosition(PositionRecorder recorder) throws IOException { - output.getPosition(recorder); - recorder.addPosition(numLiterals); + public void getPosition(PositionRecorder recorder) throws IOException { + output.getPosition(recorder); + recorder.addPosition(numLiterals); } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java new file mode 100644 index 0000000..d97bb0a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java @@ -0,0 +1,807 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; + +/** + * A writer that performs light weight compression over sequence of integers. + *

+ * There are four types of lightweight integer compression + *

    + *
  • SHORT_REPEAT
  • + *
  • DIRECT
  • + *
  • PATCHED_BASE
  • + *
  • DELTA
  • + *
+ *

+ * The description and format for these types are as below: + *

+ * SHORT_REPEAT: Used for short repeated integer sequences. + *

    + *
  • 1 byte header + *
      + *
    • 2 bits for encoding type
    • + *
    • 3 bits for bytes required for repeating value
    • + *
    • 3 bits for repeat count (MIN_REPEAT + run length)
    • + *
    + *
  • + *
  • Blob - repeat value (fixed bytes)
  • + *
+ *

+ *

+ * DIRECT: Used for random integer sequences whose number of bit + * requirement doesn't vary a lot. + *

    + *
  • 2 bytes header + *
      + * 1st byte + *
    • 2 bits for encoding type
    • + *
    • 5 bits for fixed bit width of values in blob
    • + *
    • 1 bit for storing MSB of run length
    • + *
    + *
      + * 2nd byte + *
    • 8 bits for lower run length bits
    • + *
    + *
  • + *
  • Blob - fixed width * run length bits long
  • + *
+ *

+ *

+ * PATCHED_BASE: Used for random integer sequences whose number of bit + * requirement varies beyond a threshold. + *

    + *
  • 4 bytes header + *
      + * 1st byte + *
    • 2 bits for encoding type
    • + *
    • 5 bits for fixed bit width of values in blob
    • + *
    • 1 bit for storing MSB of run length
    • + *
    + *
      + * 2nd byte + *
    • 8 bits for lower run length bits
    • + *
    + *
      + * 3rd byte + *
    • 3 bits for bytes required for base value
    • + *
    • 5 bits for patch width
    • + *
    + *
      + * 4th byte + *
    • 3 bits for patch gap width
    • + *
    • 5 bits for patch length
    • + *
    + *
  • + *
  • Base value - base width * 8 bits
  • + *
  • Data blob - fixed width * run length
  • + *
  • Patch blob - (patch width + patch gap width) * patch length
  • + *
+ *

+ *

+ * DELTA Used for monotonically increasing or decreasing sequences, + * sequences with fixed delta values or long repeated sequences. + *

    + *
  • 2 bytes header + *
      + * 1st byte + *
    • 2 bits for encoding type
    • + *
    • 5 bits for fixed bit width of values in blob
    • + *
    • 1 bit for storing MSB of run length
    • + *
    + *
      + * 2nd byte + *
    • 8 bits for lower run length bits
    • + *
    + *
  • + *
  • Base value - encoded as varint
  • + *
  • Delta base - encoded as varint
  • + *
  • Delta blob - only positive values. monotonicity is decided based on the + * the sign of delta base
  • + *
+ *

+ */ +class RunLengthIntegerWriterV2 implements IntegerWriter { + + public enum EncodingType { + SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA + } + + static final int MAX_SCOPE = 512; + static final int MIN_REPEAT = 3; + private static final int MAX_SHORT_REPEAT_LENGTH = 10; + private static final int MAX_SHORT_REPEAT_SIZE = 8; + private long prevDelta = 0; + private int fixedRunLength = 0; + private int variableRunLength = 0; + private final long[] literals = new long[MAX_SCOPE]; + private final PositionedOutputStream output; + private final boolean signed; + private EncodingType encoding; + private int numLiterals; + private long[] zigzagLiterals; + private long[] baseRedLiterals; + private long[] adjDeltas; + private long fixedDelta; + private int zzBits90p; + private int zzBits100p; + private int brBits95p; + private int brBits100p; + private int bitsDeltaMax; + private int patchWidth; + private int patchGapWidth; + private int patchLength; + private long[] gapVsPatchList; + private long min; + private boolean isFixedDelta; + + RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) { + this.output = output; + this.signed = signed; + clear(); + } + + private void writeValues() throws IOException { + if (numLiterals != 0) { + + if (encoding.equals(EncodingType.SHORT_REPEAT)) { + writeShortRepeatValues(); + } else if (encoding.equals(EncodingType.DIRECT)) { + writeDirectValues(); + } else if (encoding.equals(EncodingType.PATCHED_BASE)) { + writePatchedBaseValues(); + } else { + writeDeltaValues(); + } + + // clear all the variables + clear(); + } + } + + private void writeDeltaValues() throws IOException { + int len = 0; + int fb = bitsDeltaMax; + int efb = 0; + + if (isFixedDelta) { + // if the fixed delta is 0 then the sequence is counted as fixed + // run length else as variable run length + if (fixedRunLength > MIN_REPEAT) { + // ex. sequence: 2 2 2 2 2 2 2 2 + len = fixedRunLength - 1; + fixedRunLength = 0; + } else { + // ex. sequence: 4 6 8 10 12 14 16 + len = variableRunLength - 1; + variableRunLength = 0; + } + } else { + // fixed width 0 is used for fixed delta runs. sequences that require + // only 1 bit to encode will have an additional bit + if (fb == 1) { + fb = 2; + } + efb = SerializationUtils.encodeBitWidth(fb); + efb = efb << 1; + len = variableRunLength - 1; + variableRunLength = 0; + } + + // extract the 9th bit of run length + int tailBits = (len & 0x100) >>> 8; + + // create first byte of the header + int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + int headerSecondByte = len & 0xff; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + + // store the first value from zigzag literal array + if (signed) { + SerializationUtils.writeVslong(output, literals[0]); + } else { + SerializationUtils.writeVulong(output, literals[0]); + } + + // if delta is fixed then we don't need to store delta blob else store + // delta blob using fixed bit packing + if (isFixedDelta) { + // store the fixed delta using zigzag encoding. deltas can be negative + // even if all values are positive + SerializationUtils.writeVslong(output, fixedDelta); + } else { + // bit pack the delta blob + SerializationUtils.writeVslong(output, adjDeltas[0]); + SerializationUtils.writeInts(adjDeltas, 1, adjDeltas.length - 1, fb, + output); + } + } + + private void writePatchedBaseValues() throws IOException { + + // write the number of fixed bits required in next 5 bits + int fb = brBits95p; + int efb = SerializationUtils.encodeBitWidth(fb) << 1; + + // adjust variable run length, they are one off + variableRunLength -= 1; + + // extract the 9th bit of run length + int tailBits = (variableRunLength & 0x100) >>> 8; + + // create first byte of the header + int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + int headerSecondByte = variableRunLength & 0xff; + + boolean isNegative = min < 0 ? true : false; + if (isNegative) { + min = -min; + } + + // find the number of bytes required for base and shift it by 5 bits + // to accommodate patch width. The additional bit is used to store the sign + // of the base value. + int baseWidth = SerializationUtils.findClosestNumBits(min) + 1; + int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1; + int bb = (baseBytes - 1) << 5; + + // if the base value is negative then add 1 to MSB + if (isNegative) { + min |= (1L << ((baseBytes * 8) - 1)); + } + + // third byte contains 3 bits for number of bytes occupied by base + // and 5 bits for patchWidth + int headerThirdByte = bb | SerializationUtils.encodeBitWidth(patchWidth); + + // fourth byte contains 3 bits for page gap width and 5 bits for + // patch length + int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + output.write(headerThirdByte); + output.write(headerFourthByte); + + // write the base value using fixed bytes in big endian order + for (int i = baseBytes - 1; i >= 0; i--) { + byte b = (byte) ((min >>> (i * 8)) & 0xff); + output.write(b); + } + + // bit packing the delta values and write each bytes + int closestFixedBits = SerializationUtils.getClosestFixedBits(brBits95p); + SerializationUtils.writeInts(baseRedLiterals, 0, baseRedLiterals.length, + closestFixedBits, output); + + // write the patch blob + closestFixedBits = SerializationUtils.getClosestFixedBits(patchGapWidth + + patchWidth); + SerializationUtils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, + closestFixedBits, output); + + // reset run length + variableRunLength = 0; + } + + private int getOpcode() { + return encoding.ordinal() << 6; + } + + private void writeDirectValues() throws IOException { + + // write the number of fixed bits required in next 5 bits + int efb = SerializationUtils.encodeBitWidth(zzBits100p) << 1; + + // adjust variable run length + variableRunLength -= 1; + + // extract the 9th bit of run length + int tailBits = (variableRunLength & 0x100) >>> 8; + + // create first byte of the header + int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of + // runlength + int headerSecondByte = variableRunLength & 0xff; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + + // bit packing the delta values and write each bytes + SerializationUtils.writeInts(zigzagLiterals, 0, zigzagLiterals.length, + zzBits100p, output); + + // reset run length + variableRunLength = 0; + } + + private void writeShortRepeatValues() throws IOException { + // get the value that is repeating, compute the bits and bytes required + long repeatVal = 0; + if (signed) { + repeatVal = SerializationUtils.zigzagEncode(literals[0]); + } else { + repeatVal = literals[0]; + } + + int numBitsRepeatVal = SerializationUtils.findClosestNumBits(repeatVal); + int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3 + : (numBitsRepeatVal >>> 3) + 1; + + // if the runs are long or too short and if the delta is non zero, then + // choose a different algorithm + if (fixedRunLength >= MIN_REPEAT + && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH + && numBytesRepeatVal <= MAX_SHORT_REPEAT_SIZE && prevDelta == 0) { + // write encoding type in top 2 bits + int header = getOpcode(); + + // write the number of bytes required for the value + header |= ((numBytesRepeatVal - 1) << 3); + + // write the run length + fixedRunLength -= MIN_REPEAT; + header |= fixedRunLength; + + // write the header + output.write(header); + + // write the payload (i.e. the repeat value) in big endian + for (int i = numBytesRepeatVal - 1; i >= 0; i--) { + int b = (int) ((repeatVal >>> (i * 8)) & 0xff); + output.write(b); + } + + fixedRunLength = 0; + } else { + determineEncoding(); + writeValues(); + } + } + + private void determineEncoding() { + // used for direct encoding + zigzagLiterals = new long[numLiterals]; + + // used for patched base encoding + baseRedLiterals = new long[numLiterals]; + + // used for delta encoding + adjDeltas = new long[numLiterals - 1]; + + int idx = 0; + + // for identifying monotonic sequences + boolean isIncreasing = false; + int increasingCount = 1; + boolean isDecreasing = false; + int decreasingCount = 1; + + // for identifying type of delta encoding + min = literals[0]; + long max = literals[0]; + isFixedDelta = true; + long currDelta = 0; + + min = literals[0]; + long deltaMax = 0; + + // populate all variables to identify the encoding type + if (numLiterals >= 1) { + currDelta = literals[1] - literals[0]; + for (int i = 0; i < numLiterals; i++) { + if (i > 0 && literals[i] >= max) { + max = literals[i]; + increasingCount++; + } + + if (i > 0 && literals[i] <= min) { + min = literals[i]; + decreasingCount++; + } + + // if delta doesn't changes then mark it as fixed delta + if (i > 0 && isFixedDelta) { + if (literals[i] - literals[i - 1] != currDelta) { + isFixedDelta = false; + } + + fixedDelta = currDelta; + } + + // store the minimum value among zigzag encoded values. The min + // value (base) will be removed in patched base encoding + long zzEncVal = 0; + if (signed) { + zzEncVal = SerializationUtils.zigzagEncode(literals[i]); + } else { + zzEncVal = literals[i]; + } + zigzagLiterals[idx] = zzEncVal; + idx++; + + // max delta value is required for computing the fixed bits + // required for delta blob in delta encoding + if (i > 0) { + if (i == 1) { + // first value preserve the sign + adjDeltas[i - 1] = literals[i] - literals[i - 1]; + } else { + adjDeltas[i - 1] = Math.abs(literals[i] - literals[i - 1]); + if (adjDeltas[i - 1] > deltaMax) { + deltaMax = adjDeltas[i - 1]; + } + } + } + } + + // stores the number of bits required for packing delta blob in + // delta encoding + bitsDeltaMax = SerializationUtils.findClosestNumBits(deltaMax); + + // if decreasing count equals total number of literals then the + // sequence is monotonically decreasing + if (increasingCount == 1 && decreasingCount == numLiterals) { + isDecreasing = true; + } + + // if increasing count equals total number of literals then the + // sequence is monotonically increasing + if (decreasingCount == 1 && increasingCount == numLiterals) { + isIncreasing = true; + } + } + + // if the sequence is both increasing and decreasing then it is not + // monotonic + if (isDecreasing && isIncreasing) { + isDecreasing = false; + isIncreasing = false; + } + + // fixed delta condition + if (isIncreasing == false && isDecreasing == false && isFixedDelta == true) { + encoding = EncodingType.DELTA; + return; + } + + // monotonic condition + if (isIncreasing || isDecreasing) { + encoding = EncodingType.DELTA; + return; + } + + // percentile values are computed for the zigzag encoded values. if the + // number of bit requirement between 90th and 100th percentile varies + // beyond a threshold then we need to patch the values. if the variation + // is not significant then we can use direct or delta encoding + + double p = 0.9; + zzBits90p = SerializationUtils.percentileBits(zigzagLiterals, p); + + p = 1.0; + zzBits100p = SerializationUtils.percentileBits(zigzagLiterals, p); + + int diffBitsLH = zzBits100p - zzBits90p; + + // if the difference between 90th percentile and 100th percentile fixed + // bits is > 1 then we need patch the values + if (isIncreasing == false && isDecreasing == false && diffBitsLH > 1 + && isFixedDelta == false) { + // patching is done only on base reduced values. + // remove base from literals + for (int i = 0; i < zigzagLiterals.length; i++) { + baseRedLiterals[i] = literals[i] - min; + } + + // 95th percentile width is used to determine max allowed value + // after which patching will be done + p = 0.95; + brBits95p = SerializationUtils.percentileBits(baseRedLiterals, p); + + // 100th percentile is used to compute the max patch width + p = 1.0; + brBits100p = SerializationUtils.percentileBits(baseRedLiterals, p); + + // after base reducing the values, if the difference in bits between + // 95th percentile and 100th percentile value is zero then there + // is no point in patching the values, in which case we will + // fallback to DIRECT encoding. + // The decision to use patched base was based on zigzag values, but the + // actual patching is done on base reduced literals. + if ((brBits100p - brBits95p) != 0) { + encoding = EncodingType.PATCHED_BASE; + preparePatchedBlob(); + return; + } else { + encoding = EncodingType.DIRECT; + return; + } + } + + // if difference in bits between 95th percentile and 100th percentile is + // 0, then patch length will become 0. Hence we will fallback to direct + if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1 + && isFixedDelta == false) { + encoding = EncodingType.DIRECT; + return; + } + + // this should not happen + if (encoding == null) { + throw new RuntimeException("Integer encoding cannot be determined."); + } + } + + private void preparePatchedBlob() { + // mask will be max value beyond which patch will be generated + int mask = (1 << brBits95p) - 1; + + // since we are considering only 95 percentile, the size of gap and + // patch array can contain only be 5% values + patchLength = (int) Math.ceil((baseRedLiterals.length * 0.05)); + int[] gapList = new int[patchLength]; + long[] patchList = new long[patchLength]; + + // #bit for patch + patchWidth = brBits100p - brBits95p; + patchWidth = SerializationUtils.getClosestFixedBits(patchWidth); + + int gapIdx = 0; + int patchIdx = 0; + int prev = 0; + int gap = 0; + int maxGap = 0; + + for (int i = 0; i < baseRedLiterals.length; i++) { + // if value is above mask then create the patch and record the gap + if (baseRedLiterals[i] > mask) { + gap = i - prev; + if (gap > maxGap) { + maxGap = gap; + } + + // gaps are relative, so store the previous patched value + prev = i; + gapList[gapIdx++] = gap; + + // extract the most significant bits that are over mask + long patch = baseRedLiterals[i] >>> brBits95p; + patchList[patchIdx++] = patch; + + // strip off the MSB to enable safe bit packing + baseRedLiterals[i] &= mask; + } + } + + // adjust the patch length to number of entries in gap list + patchLength = gapIdx; + + // if the element to be patched is the first and only element then + // max gap will be 0, but to store the gap as 0 we need atleast 1 bit + if (maxGap == 0 && patchLength != 0) { + patchGapWidth = 1; + } else { + patchGapWidth = SerializationUtils.findClosestNumBits(maxGap); + } + + // special case: if the patch gap width is greater than 256, then + // we need 9 bits to encode the gap width. But we only have 3 bits in + // header to record the gap width. To deal with this case, we will save + // two entries in final patch list with following entries + // 256 gap width => 0 for patch value + // actual gap - 256 => actual patch value + if (patchGapWidth > 8) { + patchGapWidth = 8; + // for gap = 511, we need two additional entries in patch list + if (maxGap == 511) { + patchLength += 2; + } else { + patchLength += 1; + } + } + + // create gap vs patch list + gapIdx = 0; + patchIdx = 0; + gapVsPatchList = new long[patchLength]; + for (int i = 0; i < patchLength; i++) { + long g = gapList[gapIdx++]; + long p = patchList[patchIdx++]; + while (g > 255) { + gapVsPatchList[i++] = (255 << patchWidth) | 0; + g -= 255; + } + + // store patch value in LSBs and gap in MSBs + gapVsPatchList[i] = (g << patchWidth) | p; + } + } + + /** + * clears all the variables + */ + private void clear() { + numLiterals = 0; + encoding = null; + prevDelta = 0; + zigzagLiterals = null; + baseRedLiterals = null; + adjDeltas = null; + fixedDelta = 0; + zzBits90p = 0; + zzBits100p = 0; + brBits95p = 0; + brBits100p = 0; + bitsDeltaMax = 0; + patchGapWidth = 0; + patchLength = 0; + patchWidth = 0; + gapVsPatchList = null; + min = 0; + isFixedDelta = false; + } + + public void flush() throws IOException { + if(numLiterals != 0) { + if (numLiterals == 1) { + encoding = EncodingType.SHORT_REPEAT; + fixedRunLength = 1; + writeValues(); + } else if (variableRunLength != 0) { + determineEncoding(); + writeValues(); + } else if (fixedRunLength != 0) { + if (fixedRunLength < MIN_REPEAT) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + determineEncoding(); + writeValues(); + } else { + encoding = EncodingType.SHORT_REPEAT; + writeValues(); + } + } + } + output.flush(); + } + + public void write(long val) throws IOException { + if (numLiterals == 0) { + initializeLiterals(val); + } else { + if (numLiterals == 1) { + prevDelta = val - literals[0]; + literals[numLiterals++] = val; + // if both values are same count as fixed run else variable run + if (val == literals[0]) { + fixedRunLength = 2; + variableRunLength = 0; + } else { + fixedRunLength = 0; + variableRunLength = 2; + } + } else { + long currentDelta = val - literals[numLiterals - 1]; + if (prevDelta == 0 && currentDelta == 0) { + // fixed delta run + + literals[numLiterals++] = val; + + // if variable run is non-zero then we are seeing repeating + // values at the end of variable run in which case keep + // updating variable and fixed runs + if (variableRunLength > 0) { + fixedRunLength = 2; + } + fixedRunLength += 1; + + // if fixed run met the minimum condition and if variable + // run is non-zero then flush the variable run and shift the + // tail fixed runs to start of the buffer + if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) { + numLiterals -= MIN_REPEAT; + variableRunLength -= MIN_REPEAT - 1; + // copy the tail fixed runs + long[] tailVals = new long[MIN_REPEAT]; + System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT); + + // determine variable encoding and flush values + determineEncoding(); + writeValues(); + + // shift tail fixed runs to beginning of the buffer + for (long l : tailVals) { + literals[numLiterals++] = l; + } + } + + // if fixed runs reached max repeat length then write values + if (fixedRunLength == MAX_SCOPE) { + determineEncoding(); + writeValues(); + } + } else { + // variable delta run + + // if fixed run length is non-zero and if satisfies the + // short repeat algorithm conditions then write the values + // as short repeats else determine the appropriate algorithm + // to persist the values + if (fixedRunLength >= MIN_REPEAT) { + if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { + encoding = EncodingType.SHORT_REPEAT; + writeValues(); + } else { + determineEncoding(); + writeValues(); + } + } + + // if fixed run length is 0 && fixedRunLength < MIN_REPEAT) { + if (val != literals[numLiterals - 1]) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + } + } + + // after writing values re-initialize the variables + if (numLiterals == 0) { + initializeLiterals(val); + } else { + // keep updating variable run lengths + prevDelta = val - literals[numLiterals - 1]; + literals[numLiterals++] = val; + variableRunLength += 1; + + // if variable run length reach the max scope, write it + if (variableRunLength == MAX_SCOPE) { + determineEncoding(); + writeValues(); + } + } + } + } + } + } + + private void initializeLiterals(long val) { + literals[numLiterals++] = val; + fixedRunLength = 1; + variableRunLength = 1; + } + + public void getPosition(PositionRecorder recorder) throws IOException { + output.getPosition(recorder); + recorder.addPosition(numLiterals); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java index 67762b5..d08c1cb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java @@ -185,4 +185,279 @@ static BigInteger readBigInteger(InputStream input) throws IOException { result = result.shiftRight(1); return result; } + + enum FixedBitSizes { + ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE, + THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN, + TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX, + TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR; + } + + /** + * Count the number of bits required to encode the given value + * @param value + * @return bits required to store value + */ + static int findClosestNumBits(long value) { + int count = 0; + while (value > 0) { + count++; + value = value >>> 1; + } + return getClosestFixedBits(count); + } + + /** + * zigzag encode the given value + * @param val + * @return zigzag encoded value + */ + static long zigzagEncode(long val) { + return (val << 1) ^ (val >> 63); + } + + /** + * zigzag decode the given value + * @param val + * @return zizag decoded value + */ + static long zigzagDecode(long val) { + return (val >>> 1) ^ -(val & 1); + } + + /** + * Compute the bits required to represent pth percentile value + * @param data - array + * @param p - percentile value (>=0.0 to <=1.0) + * @return pth percentile bits + */ + static int percentileBits(long[] data, double p) { + if ((p > 1.0) || (p <= 0.0)) { + return -1; + } + + // histogram that store the encoded bit requirement for each values. + // maximum number of bits that can encoded is 32 (refer FixedBitSizes) + int[] hist = new int[32]; + + // compute the histogram + for (long l : data) { + int idx = encodeBitWidth(findClosestNumBits(l)); + hist[idx] += 1; + } + + int len = data.length; + int perLen = (int) (len * (1.0 - p)); + + // return the bits required by pth percentile length + for (int i = hist.length - 1; i >= 0; i--) { + perLen -= hist[i]; + if (perLen < 0) { + return decodeBitWidth(i); + } + } + + return 0; + } + + /** + * Read n bytes in big endian order and convert to long + * @param b - byte array + * @return long value + */ + static long bytesToLongBE(InStream input, int n) throws IOException { + long out = 0; + long val = 0; + while (n > 0) { + n--; + // store it in a long and then shift else integer overflow will occur + val = input.read(); + out |= (val << (n * 8)); + } + return out; + } + + /** + * Calculate the number of bytes required + * @param n - number of values + * @param numBits - bit width + * @return number of bytes required + */ + static int getTotalBytesRequired(int n, int numBits) { + return (n * numBits + 7) / 8; + } + + /** + * For a given fixed bit this function will return the closest available fixed + * bit + * @param n + * @return closest valid fixed bit + */ + static int getClosestFixedBits(int n) { + if (n == 0) { + return 1; + } + + if (n >= 1 && n <= 24) { + return n; + } else if (n > 24 && n <= 26) { + return 26; + } else if (n > 26 && n <= 28) { + return 28; + } else if (n > 28 && n <= 30) { + return 30; + } else if (n > 30 && n <= 32) { + return 32; + } else if (n > 32 && n <= 40) { + return 40; + } else if (n > 40 && n <= 48) { + return 48; + } else if (n > 48 && n <= 56) { + return 56; + } else { + return 64; + } + } + + /** + * Finds the closest available fixed bit width match and returns its encoded + * value (ordinal) + * @param n - fixed bit width to encode + * @return encoded fixed bit width + */ + static int encodeBitWidth(int n) { + n = getClosestFixedBits(n); + + if (n >= 1 && n <= 24) { + return n - 1; + } else if (n > 24 && n <= 26) { + return FixedBitSizes.TWENTYSIX.ordinal(); + } else if (n > 26 && n <= 28) { + return FixedBitSizes.TWENTYEIGHT.ordinal(); + } else if (n > 28 && n <= 30) { + return FixedBitSizes.THIRTY.ordinal(); + } else if (n > 30 && n <= 32) { + return FixedBitSizes.THIRTYTWO.ordinal(); + } else if (n > 32 && n <= 40) { + return FixedBitSizes.FORTY.ordinal(); + } else if (n > 40 && n <= 48) { + return FixedBitSizes.FORTYEIGHT.ordinal(); + } else if (n > 48 && n <= 56) { + return FixedBitSizes.FIFTYSIX.ordinal(); + } else { + return FixedBitSizes.SIXTYFOUR.ordinal(); + } + } + + /** + * Decodes the ordinal fixed bit value to actual fixed bit width value + * @param n - encoded fixed bit width + * @return decoded fixed bit width + */ + static int decodeBitWidth(int n) { + if (n >= FixedBitSizes.ONE.ordinal() + && n <= FixedBitSizes.TWENTYFOUR.ordinal()) { + return n + 1; + } else if (n == FixedBitSizes.TWENTYSIX.ordinal()) { + return 26; + } else if (n == FixedBitSizes.TWENTYEIGHT.ordinal()) { + return 28; + } else if (n == FixedBitSizes.THIRTY.ordinal()) { + return 30; + } else if (n == FixedBitSizes.THIRTYTWO.ordinal()) { + return 32; + } else if (n == FixedBitSizes.FORTY.ordinal()) { + return 40; + } else if (n == FixedBitSizes.FORTYEIGHT.ordinal()) { + return 48; + } else if (n == FixedBitSizes.FIFTYSIX.ordinal()) { + return 56; + } else { + return 64; + } + } + + /** + * Bitpack and write the input values to underlying output stream + * @param input - values to write + * @param offset - offset + * @param len - length + * @param bitSize - bit width + * @param output - output stream + * @throws IOException + */ + static void writeInts(long[] input, int offset, int len, int bitSize, + OutputStream output) throws IOException { + if (input == null || input.length < 1 || offset < 0 || len < 1 + || bitSize < 1) { + return; + } + + int bitsLeft = 8; + byte current = 0; + for (int i = offset; i < (offset + len); i++) { + long value = input[i]; + int bitsToWrite = bitSize; + while (bitsToWrite > bitsLeft) { + // add the bits to the bottom of the current word + current |= value >>> (bitsToWrite - bitsLeft); + // subtract out the bits we just added + bitsToWrite -= bitsLeft; + // zero out the bits above bitsToWrite + value &= (1L << bitsToWrite) - 1; + output.write(current); + current = 0; + bitsLeft = 8; + } + bitsLeft -= bitsToWrite; + current |= value << bitsLeft; + if (bitsLeft == 0) { + output.write(current); + current = 0; + bitsLeft = 8; + } + } + + // flush + if (bitsLeft != 8) { + output.write(current); + current = 0; + bitsLeft = 8; + } + } + + /** + * Read bitpacked integers from input stream + * @param buffer - input buffer + * @param offset - offset + * @param len - length + * @param bitSize - bit width + * @param input - input stream + * @throws IOException + */ + static void readInts(long[] buffer, int offset, int len, int bitSize, + InStream input) throws IOException { + int bitsLeft = 0; + int current = 0; + + for (int i = offset; i < (offset + len); i++) { + long result = 0; + int bitsLeftToRead = bitSize; + while (bitsLeftToRead > bitsLeft) { + result <<= bitsLeft; + result |= current & ((1 << bitsLeft) - 1); + bitsLeftToRead -= bitsLeft; + current = input.read(); + bitsLeft = 8; + } + + // handle the left over bits + if (bitsLeftToRead > 0) { + result <<= bitsLeftToRead; + bitsLeft -= bitsLeftToRead; + result |= (current >> bitsLeft) & ((1 << bitsLeftToRead) - 1); + } + buffer[i] = result; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 9f314d9..c048605 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -366,6 +366,7 @@ public boolean isCompressed() { private final PositionedOutputStream rowIndexStream; private boolean foundNulls; private OutStream isPresentOutStream; + protected final boolean useDirectV2Encoding; /** * Create a tree writer. @@ -381,6 +382,7 @@ public boolean isCompressed() { this.isCompressed = streamFactory.isCompressed(); this.id = columnId; this.inspector = inspector; + this.useDirectV2Encoding = true; if (nullable) { isPresentOutStream = streamFactory.createStream(id, OrcProto.Stream.Kind.PRESENT); @@ -495,6 +497,10 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, * @return the information about the encoding of this column */ OrcProto.ColumnEncoding getEncoding() { + if (useDirectV2Encoding) { + return OrcProto.ColumnEncoding.newBuilder().setKind( + OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } return OrcProto.ColumnEncoding.newBuilder().setKind( OrcProto.ColumnEncoding.Kind.DIRECT).build(); } @@ -620,7 +626,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { } private static class IntegerTreeWriter extends TreeWriter { - private final RunLengthIntegerWriter writer; + private final IntegerWriter writer; private final ShortObjectInspector shortInspector; private final IntObjectInspector intInspector; private final LongObjectInspector longInspector; @@ -632,7 +638,11 @@ void recordPosition(PositionRecorder recorder) throws IOException { super(columnId, inspector, writer, nullable); PositionedOutputStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); - this.writer = new RunLengthIntegerWriter(out, true); + if (super.useDirectV2Encoding) { + this.writer = new RunLengthIntegerWriterV2(out, true); + } else { + this.writer = new RunLengthIntegerWriter(out, true); + } if (inspector instanceof IntObjectInspector) { intInspector = (IntObjectInspector) inspector; shortInspector = null; @@ -761,8 +771,8 @@ void recordPosition(PositionRecorder recorder) throws IOException { private static class StringTreeWriter extends TreeWriter { private static final int INITIAL_DICTIONARY_SIZE = 4096; private final PositionedOutputStream stringOutput; - private final RunLengthIntegerWriter lengthOutput; - private final RunLengthIntegerWriter rowOutput; + private final IntegerWriter lengthOutput; + private final IntegerWriter rowOutput; private final StringRedBlackTree dictionary = new StringRedBlackTree(INITIAL_DICTIONARY_SIZE); private final DynamicIntArray rows = new DynamicIntArray(); @@ -778,10 +788,17 @@ void recordPosition(PositionRecorder recorder) throws IOException { super(columnId, inspector, writer, nullable); stringOutput = writer.createStream(id, OrcProto.Stream.Kind.DICTIONARY_DATA); - lengthOutput = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.LENGTH), false); - rowOutput = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.DATA), false); + if (super.useDirectV2Encoding == true) { + lengthOutput = new RunLengthIntegerWriterV2(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false); + rowOutput = new RunLengthIntegerWriterV2(writer.createStream(id, + OrcProto.Stream.Kind.DATA), false); + } else { + lengthOutput = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false); + rowOutput = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.DATA), false); + } recordPosition(rowIndexPosition); rowIndexValueCount.add(0L); buildIndex = writer.buildIndex(); @@ -851,6 +868,11 @@ public void visit(StringRedBlackTree.VisitorContext context @Override OrcProto.ColumnEncoding getEncoding() { + if(super.useDirectV2Encoding) { + return OrcProto.ColumnEncoding.newBuilder().setKind( + OrcProto.ColumnEncoding.Kind.DICTIONARY_V2). + setDictionarySize(dictionary.size()).build(); + } return OrcProto.ColumnEncoding.newBuilder().setKind( OrcProto.ColumnEncoding.Kind.DICTIONARY). setDictionarySize(dictionary.size()).build(); @@ -883,7 +905,7 @@ long estimateMemory() { private static class BinaryTreeWriter extends TreeWriter { private final PositionedOutputStream stream; - private final RunLengthIntegerWriter length; + private final IntegerWriter length; BinaryTreeWriter(int columnId, ObjectInspector inspector, @@ -892,8 +914,13 @@ long estimateMemory() { super(columnId, inspector, writer, nullable); this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); - this.length = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.LENGTH), false); + if (super.useDirectV2Encoding) { + this.length = new RunLengthIntegerWriterV2(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false); + } else { + this.length = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false); + } recordPosition(rowIndexPosition); } @@ -930,18 +957,25 @@ void recordPosition(PositionRecorder recorder) throws IOException { Timestamp.valueOf("2015-01-01 00:00:00").getTime() / MILLIS_PER_SECOND; private static class TimestampTreeWriter extends TreeWriter { - private final RunLengthIntegerWriter seconds; - private final RunLengthIntegerWriter nanos; + private final IntegerWriter seconds; + private final IntegerWriter nanos; TimestampTreeWriter(int columnId, ObjectInspector inspector, StreamFactory writer, boolean nullable) throws IOException { super(columnId, inspector, writer, nullable); - this.seconds = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.DATA), true); - this.nanos = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.SECONDARY), false); + if (super.useDirectV2Encoding) { + this.seconds = new RunLengthIntegerWriterV2(writer.createStream(id, + OrcProto.Stream.Kind.DATA), true); + this.nanos = new RunLengthIntegerWriterV2(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), false); + } else { + this.seconds = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.DATA), true); + this.nanos = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), false); + } recordPosition(rowIndexPosition); } @@ -1032,7 +1066,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { private static class DecimalTreeWriter extends TreeWriter { private final PositionedOutputStream valueStream; - private final RunLengthIntegerWriter scaleStream; + private final IntegerWriter scaleStream; DecimalTreeWriter(int columnId, ObjectInspector inspector, @@ -1040,8 +1074,13 @@ void recordPosition(PositionRecorder recorder) throws IOException { boolean nullable) throws IOException { super(columnId, inspector, writer, nullable); valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA); - scaleStream = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.SECONDARY), true); + if (super.useDirectV2Encoding) { + this.scaleStream = new RunLengthIntegerWriterV2(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), true); + } else { + this.scaleStream = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), true); + } recordPosition(rowIndexPosition); } @@ -1118,7 +1157,7 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, } private static class ListTreeWriter extends TreeWriter { - private final RunLengthIntegerWriter lengths; + private final IntegerWriter lengths; ListTreeWriter(int columnId, ObjectInspector inspector, @@ -1130,9 +1169,13 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, childrenWriters[0] = createTreeWriter(listObjectInspector.getListElementObjectInspector(), writer, true); - lengths = - new RunLengthIntegerWriter(writer.createStream(columnId, + if (super.useDirectV2Encoding) { + lengths = new RunLengthIntegerWriterV2(writer.createStream(columnId, OrcProto.Stream.Kind.LENGTH), false); + } else { + lengths = new RunLengthIntegerWriter(writer.createStream(columnId, + OrcProto.Stream.Kind.LENGTH), false); + } recordPosition(rowIndexPosition); } @@ -1168,7 +1211,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { } private static class MapTreeWriter extends TreeWriter { - private final RunLengthIntegerWriter lengths; + private final IntegerWriter lengths; MapTreeWriter(int columnId, ObjectInspector inspector, @@ -1181,9 +1224,15 @@ void recordPosition(PositionRecorder recorder) throws IOException { createTreeWriter(insp.getMapKeyObjectInspector(), writer, true); childrenWriters[1] = createTreeWriter(insp.getMapValueObjectInspector(), writer, true); - lengths = - new RunLengthIntegerWriter(writer.createStream(columnId, - OrcProto.Stream.Kind.LENGTH), false); + if (super.useDirectV2Encoding) { + lengths = + new RunLengthIntegerWriterV2(writer.createStream(columnId, + OrcProto.Stream.Kind.LENGTH), false); + } else { + lengths = + new RunLengthIntegerWriter(writer.createStream(columnId, + OrcProto.Stream.Kind.LENGTH), false); + } recordPosition(rowIndexPosition); } diff --git ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto index 417c37a..d5bea25 100644 --- ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto +++ ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto @@ -73,6 +73,8 @@ message ColumnEncoding { enum Kind { DIRECT = 0; DICTIONARY = 1; + DIRECT_V2 = 2; + DICTIONARY_V2 = 3; } required Kind kind = 1; optional uint32 dictionarySize = 2; diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java new file mode 100644 index 0000000..5d7cf1d --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Random; + +import org.junit.Test; + +import com.google.common.primitives.Longs; + +public class TestBitPack { + + private static final int SIZE = 100; + private static Random rand = new Random(100); + + private long[] deltaEncode(long[] inp) { + long[] output = new long[inp.length]; + for (int i = 0; i < inp.length; i++) { + output[i] = SerializationUtils.zigzagEncode(inp[i]); + } + return output; + } + + private long nextLong(Random rng, long n) { + long bits, val; + do { + bits = (rng.nextLong() << 1) >>> 1; + val = bits % n; + } while (bits - val + (n - 1) < 0L); + return val; + } + + private void runTest(int numBits) throws IOException { + long[] inp = new long[SIZE]; + for (int i = 0; i < SIZE; i++) { + long val = 0; + if (numBits <= 32) { + if (numBits == 1) { + val = -1 * rand.nextInt(2); + } else { + val = rand.nextInt((int) Math.pow(2, numBits - 1)); + } + } else { + val = nextLong(rand, (long) Math.pow(2, numBits - 2)); + } + if (val % 2 == 0) { + val = -val; + } + inp[i] = val; + } + long[] deltaEncoded = deltaEncode(inp); + long minInput = Collections.min(Longs.asList(deltaEncoded)); + long maxInput = Collections.max(Longs.asList(deltaEncoded)); + long rangeInput = maxInput - minInput; + int fixedWidth = SerializationUtils.findClosestNumBits(rangeInput); + TestInStream.OutputCollector collect = new TestInStream.OutputCollector(); + OutStream output = new OutStream("test", SIZE, null, collect); + SerializationUtils.writeInts(deltaEncoded, 0, deltaEncoded.length, + fixedWidth, output); + output.flush(); + ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); + collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); + inBuf.flip(); + long[] buff = new long[SIZE]; + SerializationUtils.readInts(buff, 0, SIZE, fixedWidth, + InStream.create("test", inBuf, null, SIZE)); + for (int i = 0; i < SIZE; i++) { + buff[i] = SerializationUtils.zigzagDecode(buff[i]); + } + assertEquals(numBits, fixedWidth); + assertArrayEquals(inp, buff); + } + + @Test + public void test01BitPacking1Bit() throws IOException { + runTest(1); + } + + @Test + public void test02BitPacking2Bit() throws IOException { + runTest(2); + } + + @Test + public void test03BitPacking3Bit() throws IOException { + runTest(3); + } + + @Test + public void test04BitPacking4Bit() throws IOException { + runTest(4); + } + + @Test + public void test05BitPacking5Bit() throws IOException { + runTest(5); + } + + @Test + public void test06BitPacking6Bit() throws IOException { + runTest(6); + } + + @Test + public void test07BitPacking7Bit() throws IOException { + runTest(7); + } + + @Test + public void test08BitPacking8Bit() throws IOException { + runTest(8); + } + + @Test + public void test09BitPacking9Bit() throws IOException { + runTest(9); + } + + @Test + public void test10BitPacking10Bit() throws IOException { + runTest(10); + } + + @Test + public void test11BitPacking11Bit() throws IOException { + runTest(11); + } + + @Test + public void test12BitPacking12Bit() throws IOException { + runTest(12); + } + + @Test + public void test13BitPacking13Bit() throws IOException { + runTest(13); + } + + @Test + public void test14BitPacking14Bit() throws IOException { + runTest(14); + } + + @Test + public void test15BitPacking15Bit() throws IOException { + runTest(15); + } + + @Test + public void test16BitPacking16Bit() throws IOException { + runTest(16); + } + + @Test + public void test17BitPacking17Bit() throws IOException { + runTest(17); + } + + @Test + public void test18BitPacking18Bit() throws IOException { + runTest(18); + } + + @Test + public void test19BitPacking19Bit() throws IOException { + runTest(19); + } + + @Test + public void test20BitPacking20Bit() throws IOException { + runTest(20); + } + + @Test + public void test21BitPacking21Bit() throws IOException { + runTest(21); + } + + @Test + public void test22BitPacking22Bit() throws IOException { + runTest(22); + } + + @Test + public void test23BitPacking23Bit() throws IOException { + runTest(23); + } + + @Test + public void test24BitPacking24Bit() throws IOException { + runTest(24); + } + + @Test + public void test26BitPacking26Bit() throws IOException { + runTest(26); + } + + @Test + public void test28BitPacking28Bit() throws IOException { + runTest(28); + } + + @Test + public void test30BitPacking30Bit() throws IOException { + runTest(30); + } + + @Test + public void test32BitPacking32Bit() throws IOException { + runTest(32); + } + + @Test + public void test40BitPacking40Bit() throws IOException { + runTest(40); + } + + @Test + public void test48BitPacking48Bit() throws IOException { + runTest(48); + } + + @Test + public void test56BitPacking56Bit() throws IOException { + runTest(56); + } + + @Test + public void test64BitPacking64Bit() throws IOException { + runTest(64); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java index a3cf6e9..c5d4a5f 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java @@ -18,15 +18,8 @@ package org.apache.hadoop.hive.ql.io.orc; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import java.io.BufferedReader; import java.io.File; @@ -35,8 +28,13 @@ import java.io.PrintStream; import java.util.Random; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.junit.Before; +import org.junit.Test; public class TestFileDump { diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java new file mode 100644 index 0000000..0fb0fcb --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import static junit.framework.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.util.Random; + +import org.junit.Test; + +public class TestIntegerCompressionReader { + + public void runSeekTest(CompressionCodec codec) throws Exception { + TestInStream.OutputCollector collect = new TestInStream.OutputCollector(); + RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2( + new OutStream("test", 1000, codec, collect), true); + TestInStream.PositionCollector[] positions = + new TestInStream.PositionCollector[4096]; + Random random = new Random(99); + int[] junk = new int[2048]; + for(int i=0; i < junk.length; ++i) { + junk[i] = random.nextInt(); + } + for(int i=0; i < 4096; ++i) { + positions[i] = new TestInStream.PositionCollector(); + out.getPosition(positions[i]); + // test runs, incrementing runs, non-runs + if (i < 1024) { + out.write(i/4); + } else if (i < 2048) { + out.write(2*i); + } else { + out.write(junk[i-2048]); + } + } + out.flush(); + ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); + collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); + inBuf.flip(); + RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create + ("test", inBuf, codec, 1000), true); + for(int i=0; i < 2048; ++i) { + int x = (int) in.next(); + if (i < 1024) { + assertEquals(i/4, x); + } else if (i < 2048) { + assertEquals(2*i, x); + } else { + assertEquals(junk[i-2048], x); + } + } + for(int i=2047; i >= 0; --i) { + in.seek(positions[i]); + int x = (int) in.next(); + if (i < 1024) { + assertEquals(i/4, x); + } else if (i < 2048) { + assertEquals(2*i, x); + } else { + assertEquals(junk[i-2048], x); + } + } + } + + @Test + public void testUncompressedSeek() throws Exception { + runSeekTest(null); + } + + @Test + public void testCompressedSeek() throws Exception { + runSeekTest(new ZlibCodec()); + } + + @Test + public void testSkips() throws Exception { + TestInStream.OutputCollector collect = new TestInStream.OutputCollector(); + RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2( + new OutStream("test", 100, null, collect), true); + for(int i=0; i < 2048; ++i) { + if (i < 1024) { + out.write(i); + } else { + out.write(256 * i); + } + } + out.flush(); + ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); + collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); + inBuf.flip(); + RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create + ("test", inBuf, null, 100), true); + for(int i=0; i < 2048; i += 10) { + int x = (int) in.next(); + if (i < 1024) { + assertEquals(i, x); + } else { + assertEquals(256 * i, x); + } + if (i < 2038) { + in.skip(9); + } + in.skip(0); + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java new file mode 100644 index 0000000..bdc7a15 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java @@ -0,0 +1,793 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import static junit.framework.Assert.assertEquals; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; + +public class TestNewIntegerEncoding { + + public static class Row { + Integer int1; + Long long1; + + public Row(int val, long l) { + this.int1 = val; + this.long1 = l; + } + } + + public List fetchData(String path) throws IOException { + List input = new ArrayList(); + FileInputStream stream = new FileInputStream(new File(path)); + try { + FileChannel fc = stream.getChannel(); + MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size()); + /* Instead of using default, pass in a decoder. */ + String[] lines = Charset.defaultCharset().decode(bb).toString() + .split("\n"); + for (String line : lines) { + long val = 0; + try { + val = Long.parseLong(line); + } catch (NumberFormatException e) { + // for now lets ignore (assign 0) + } + input.add(val); + } + } finally { + stream.close(); + } + return input; + } + + Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + + File.separator + "test" + File.separator + "tmp")); + + Configuration conf; + FileSystem fs; + Path testFilePath; + String resDir = "ql/src/test/resources"; + + @Rule + public TestName testCaseName = new TestName(); + + @Before + public void openFileSystem() throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + testFilePath = new Path(workDir, "TestOrcFile." + + testCaseName.getMethodName() + ".orc"); + fs.delete(testFilePath, false); + } + + @Test + public void testBasicRow() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Row.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + writer.addRow(new Row(111, 1111L)); + writer.addRow(new Row(111, 1111L)); + writer.addRow(new Row(111, 1111L)); + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(new IntWritable(111), ((OrcStruct) row).getFieldValue(0)); + assertEquals(new LongWritable(1111), ((OrcStruct) row).getFieldValue(1)); + } + } + + @Test + public void testBasic() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5, 6, + 7, 8, 9, 10, 1, 1, 1, 1, 1, 1, 10, 9, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, + 2, 5, 1, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, + 9, 2, 6, 3, 7, 1, 9, 2, 6, 2000, 2, 1, 1, 1, 1, 1, 3, 7, 1, 9, 2, 6, 1, + 1, 1, 1, 1 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testBasicDelta1() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { -500, -400, -350, -325, -310 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testBasicDelta2() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { -500, -600, -650, -675, -710 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testBasicDelta3() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 500, 400, 350, 325, 310 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testBasicDelta4() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 500, 600, 650, 675, 710 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testIntegerMin() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add((long) Integer.MIN_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.ZLIB, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testIntegerMax() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add((long) Integer.MAX_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testLongMin() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add(Long.MIN_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testLongMax() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add(Long.MAX_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testRandomInt() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 100000; i++) { + input.add((long) rand.nextInt()); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testRandomLong() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 100000; i++) { + input.add(rand.nextLong()); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseNegativeMin() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2, + 3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, 1, 2, 3, 1, 8, 30, 1, 3, 414, 1, + 1, 135, 3, 3, 1, 414, 2, 1, 2, 2, 594, 2, 5, 6, 4, 11, 1, 2, 2, 1, 1, + 52, 4, 1, 2, 7, 1, 17, 334, 1, 2, 1, 2, 2, 6, 1, 266, 1, 2, 217, 2, 6, + 2, 13, 2, 2, 1, 2, 3, 5, 1, 2, 1, 7244, 11813, 1, 33, 2, -13, 1, 2, 3, + 13, 1, 92, 3, 13, 5, 14, 9, 141, 12, 6, 15, 25, 1, 1, 1, 46, 2, 1, 1, + 141, 3, 1, 1, 1, 1, 2, 1, 4, 34, 5, 78, 8, 1, 2, 2, 1, 9, 10, 2, 1, 4, + 13, 1, 5, 4, 4, 19, 5, 1, 1, 1, 68, 33, 399, 1, 1885, 25, 5, 2, 4, 1, + 1, 2, 16, 1, 2966, 3, 1, 1, 25501, 1, 1, 1, 66, 1, 3, 8, 131, 14, 5, 1, + 2, 2, 1, 1, 8, 1, 1, 2, 1, 5, 9, 2, 3, 112, 13, 2, 2, 1, 5, 10, 3, 1, + 1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2, + 2, 16 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseNegativeMin2() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2, + 3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, 1, 2, 3, 1, 8, 30, 1, 3, 414, 1, + 1, 135, 3, 3, 1, 414, 2, 1, 2, 2, 594, 2, 5, 6, 4, 11, 1, 2, 2, 1, 1, + 52, 4, 1, 2, 7, 1, 17, 334, 1, 2, 1, 2, 2, 6, 1, 266, 1, 2, 217, 2, 6, + 2, 13, 2, 2, 1, 2, 3, 5, 1, 2, 1, 7244, 11813, 1, 33, 2, -1, 1, 2, 3, + 13, 1, 92, 3, 13, 5, 14, 9, 141, 12, 6, 15, 25, 1, 1, 1, 46, 2, 1, 1, + 141, 3, 1, 1, 1, 1, 2, 1, 4, 34, 5, 78, 8, 1, 2, 2, 1, 9, 10, 2, 1, 4, + 13, 1, 5, 4, 4, 19, 5, 1, 1, 1, 68, 33, 399, 1, 1885, 25, 5, 2, 4, 1, + 1, 2, 16, 1, 2966, 3, 1, 1, 25501, 1, 1, 1, 66, 1, 3, 8, 131, 14, 5, 1, + 2, 2, 1, 1, 8, 1, 1, 2, 1, 5, 9, 2, 3, 112, 13, 2, 2, 1, 5, 10, 3, 1, + 1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2, + 2, 16 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseNegativeMin3() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2, + 3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, 1, 2, 3, 1, 8, 30, 1, 3, 414, 1, + 1, 135, 3, 3, 1, 414, 2, 1, 2, 2, 594, 2, 5, 6, 4, 11, 1, 2, 2, 1, 1, + 52, 4, 1, 2, 7, 1, 17, 334, 1, 2, 1, 2, 2, 6, 1, 266, 1, 2, 217, 2, 6, + 2, 13, 2, 2, 1, 2, 3, 5, 1, 2, 1, 7244, 11813, 1, 33, 2, 0, 1, 2, 3, + 13, 1, 92, 3, 13, 5, 14, 9, 141, 12, 6, 15, 25, 1, 1, 1, 46, 2, 1, 1, + 141, 3, 1, 1, 1, 1, 2, 1, 4, 34, 5, 78, 8, 1, 2, 2, 1, 9, 10, 2, 1, 4, + 13, 1, 5, 4, 4, 19, 5, 1, 1, 1, 68, 33, 399, 1, 1885, 25, 5, 2, 4, 1, + 1, 2, 16, 1, 2966, 3, 1, 1, 25501, 1, 1, 1, 66, 1, 3, 8, 131, 14, 5, 1, + 2, 2, 1, 1, 8, 1, 1, 2, 1, 5, 9, 2, 3, 112, 13, 2, 2, 1, 5, 10, 3, 1, + 1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2, + 2, 16 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseNegativeMin4() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 13, 13, 11, 8, 13, 10, 10, 11, 11, 14, 11, 7, 13, + 12, 12, 11, 15, 12, 12, 9, 8, 10, 13, 11, 8, 6, 5, 6, 11, 7, 15, 10, 7, + 6, 8, 7, 9, 9, 11, 33, 11, 3, 7, 4, 6, 10, 14, 12, 5, 14, 7, 6 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt0() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(0, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt1() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(1, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt255() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(255, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.ZLIB, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt256() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(256, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.ZLIB, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBase510() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(510, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.ZLIB, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBase511() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(511, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.ZLIB, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testSeek() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 100000; i++) { + input.add((long) rand.nextInt()); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 55555; + rows.seekToRow(idx); + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java index 9f989fd..2f7a7f1 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.ql.io.orc; import static junit.framework.Assert.assertEquals;