diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2ab4826..8e2e90b 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -502,6 +502,9 @@ // Maximum fraction of heap that can be used by ORC file writers HIVE_ORC_FILE_MEMORY_POOL("hive.exec.orc.memory.pool", 0.5f), // 50% + // use 0.11 version of RLE encoding. if this conf is not defined or any + // other value specified, ORC will use the new RLE encoding + HIVE_ORC_WRITE_FORMAT("hive.exec.orc.write.format", "0.11"), HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD("hive.exec.orc.dictionary.key.size.threshold", 0.8f), diff --git ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java index 19a6d0e..55ac8c6 100644 --- ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java +++ ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java @@ -5695,10 +5695,14 @@ public ColumnEncoding getDefaultInstanceForType() { implements com.google.protobuf.ProtocolMessageEnum { DIRECT(0, 0), DICTIONARY(1, 1), + DIRECT_V2(2, 2), + DICTIONARY_V2(3, 3), ; public static final int DIRECT_VALUE = 0; public static final int DICTIONARY_VALUE = 1; + public static final int DIRECT_V2_VALUE = 2; + public static final int DICTIONARY_V2_VALUE = 3; public final int getNumber() { return value; } @@ -5707,6 +5711,8 @@ public static Kind valueOf(int value) { switch (value) { case 0: return DIRECT; case 1: return DICTIONARY; + case 2: return DIRECT_V2; + case 3: return DICTIONARY_V2; default: return null; } } @@ -5737,7 +5743,7 @@ public Kind findValueByNumber(int number) { } private static final Kind[] VALUES = { - DIRECT, DICTIONARY, + DIRECT, DICTIONARY, DIRECT_V2, DICTIONARY_V2, }; public static Kind valueOf( @@ -11117,42 +11123,42 @@ void setMagic(com.google.protobuf.ByteString value) { "eam.Kind\022\016\n\006column\030\002 \001(\r\022\016\n\006length\030\003 \001(\004", "\"r\n\004Kind\022\013\n\007PRESENT\020\000\022\010\n\004DATA\020\001\022\n\n\006LENGT" + "H\020\002\022\023\n\017DICTIONARY_DATA\020\003\022\024\n\020DICTIONARY_C" + - "OUNT\020\004\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\221\001\n" + + "OUNT\020\004\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\263\001\n" + "\016ColumnEncoding\022C\n\004kind\030\001 \002(\01625.org.apac" + "he.hadoop.hive.ql.io.orc.ColumnEncoding." + - "Kind\022\026\n\016dictionarySize\030\002 \001(\r\"\"\n\004Kind\022\n\n\006" + - "DIRECT\020\000\022\016\n\nDICTIONARY\020\001\"\214\001\n\014StripeFoote" + - "r\0229\n\007streams\030\001 \003(\0132(.org.apache.hadoop.h" + - "ive.ql.io.orc.Stream\022A\n\007columns\030\002 \003(\01320." + - "org.apache.hadoop.hive.ql.io.orc.ColumnE", - "ncoding\"\250\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apa" + - "che.hadoop.hive.ql.io.orc.Type.Kind\022\024\n\010s" + - "ubtypes\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\272\001" + - "\n\004Kind\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002" + - "\022\007\n\003INT\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE" + - "\020\006\022\n\n\006STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020" + - "\t\022\010\n\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNIO" + - "N\020\r\022\013\n\007DECIMAL\020\016\022\010\n\004DATE\020\017\"x\n\021StripeInfo" + - "rmation\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002" + - " \001(\004\022\022\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength", - "\030\004 \001(\004\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMeta" + - "dataItem\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002" + - "\n\006Footer\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rconten" + - "tLength\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apa" + - "che.hadoop.hive.ql.io.orc.StripeInformat" + - "ion\0225\n\005types\030\004 \003(\0132&.org.apache.hadoop.h" + - "ive.ql.io.orc.Type\022D\n\010metadata\030\005 \003(\01322.o" + - "rg.apache.hadoop.hive.ql.io.orc.UserMeta" + - "dataItem\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatis" + - "tics\030\007 \003(\01322.org.apache.hadoop.hive.ql.i", - "o.orc.ColumnStatistics\022\026\n\016rowIndexStride" + - "\030\010 \001(\r\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 " + - "\001(\004\022F\n\013compression\030\002 \001(\01621.org.apache.ha" + - "doop.hive.ql.io.orc.CompressionKind\022\034\n\024c" + - "ompressionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003" + - "(\rB\002\020\001\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKin" + - "d\022\010\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO" + - "\020\003" + "Kind\022\026\n\016dictionarySize\030\002 \001(\r\"D\n\004Kind\022\n\n\006" + + "DIRECT\020\000\022\016\n\nDICTIONARY\020\001\022\r\n\tDIRECT_V2\020\002\022" + + "\021\n\rDICTIONARY_V2\020\003\"\214\001\n\014StripeFooter\0229\n\007s" + + "treams\030\001 \003(\0132(.org.apache.hadoop.hive.ql" + + ".io.orc.Stream\022A\n\007columns\030\002 \003(\01320.org.ap", + "ache.hadoop.hive.ql.io.orc.ColumnEncodin" + + "g\"\250\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apache.ha" + + "doop.hive.ql.io.orc.Type.Kind\022\024\n\010subtype" + + "s\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\272\001\n\004Kind" + + "\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002\022\007\n\003IN" + + "T\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE\020\006\022\n\n\006" + + "STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020\t\022\010\n\004L" + + "IST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNION\020\r\022\013\n" + + "\007DECIMAL\020\016\022\010\n\004DATE\020\017\"x\n\021StripeInformatio" + + "n\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002 \001(\004\022\022", + "\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength\030\004 \001(\004" + + "\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMetadataIt" + + "em\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n\006Foot" + + "er\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rcontentLengt" + + "h\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apache.ha" + + "doop.hive.ql.io.orc.StripeInformation\0225\n" + + "\005types\030\004 \003(\0132&.org.apache.hadoop.hive.ql" + + ".io.orc.Type\022D\n\010metadata\030\005 \003(\01322.org.apa" + + "che.hadoop.hive.ql.io.orc.UserMetadataIt" + + "em\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatistics\030\007", + " \003(\01322.org.apache.hadoop.hive.ql.io.orc." + + "ColumnStatistics\022\026\n\016rowIndexStride\030\010 \001(\r" + + "\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 \001(\004\022F\n" + + "\013compression\030\002 \001(\01621.org.apache.hadoop.h" + + "ive.ql.io.orc.CompressionKind\022\034\n\024compres" + + "sionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003(\rB\002\020\001" + + "\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKind\022\010\n\004N" + + "ONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java new file mode 100644 index 0000000..04cfa58 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; + +/** + * Interface for reading integers. + */ +interface IntegerReader { + + /** + * Seek to the position provided by index. + * @param index + * @throws IOException + */ + void seek(PositionProvider index) throws IOException; + + /** + * Skip number of specified rows. + * @param numValues + * @throws IOException + */ + void skip(long numValues) throws IOException; + + /** + * Check if there are any more values left. + * @return + * @throws IOException + */ + boolean hasNext() throws IOException; + + /** + * Return the next available value. + * @return + * @throws IOException + */ + long next() throws IOException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java new file mode 100644 index 0000000..775d02e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; + +/** + * Interface for writing integers. + */ +interface IntegerWriter { + + /** + * Get position from the stream. + * @param recorder + * @throws IOException + */ + void getPosition(PositionRecorder recorder) throws IOException; + + /** + * Write the integer value + * @param value + * @throws IOException + */ + void write(long value) throws IOException; + + /** + * Flush the buffer + * @throws IOException + */ + void flush() throws IOException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 3038e26..18c5ac8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -130,6 +131,21 @@ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { } } + IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind, + InStream in, + boolean signed) throws IOException { + switch (kind) { + case DIRECT_V2: + case DICTIONARY_V2: + return new RunLengthIntegerReaderV2(in, signed); + case DIRECT: + case DICTIONARY: + return new RunLengthIntegerReader(in, signed); + default: + throw new IllegalArgumentException("Unknown encoding " + kind); + } + } + void startStripe(Map streams, List encoding ) throws IOException { @@ -266,20 +282,29 @@ void skipRows(long items) throws IOException { } private static class ShortTreeReader extends TreeReader{ - private RunLengthIntegerReader reader = null; + private IntegerReader reader = null; ShortTreeReader(Path path, int columnId) { super(path, columnId); } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true); } @Override @@ -310,20 +335,29 @@ void skipRows(long items) throws IOException { } private static class IntTreeReader extends TreeReader{ - private RunLengthIntegerReader reader = null; + private IntegerReader reader = null; IntTreeReader(Path path, int columnId) { super(path, columnId); } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true); } @Override @@ -354,20 +388,29 @@ void skipRows(long items) throws IOException { } private static class LongTreeReader extends TreeReader{ - private RunLengthIntegerReader reader = null; + private IntegerReader reader = null; LongTreeReader(Path path, int columnId) { super(path, columnId); } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true); } @Override @@ -492,13 +535,22 @@ void skipRows(long items) throws IOException { private static class BinaryTreeReader extends TreeReader{ private InStream stream; - private RunLengthIntegerReader lengths; + private IntegerReader lengths = null; BinaryTreeReader(Path path, int columnId) { super(path, columnId); } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { @@ -506,9 +558,8 @@ void startStripe(Map streams, StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); stream = streams.get(name); - lengths = new RunLengthIntegerReader(streams.get(new - StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), - false); + lengths = createIntegerReader(encodings.get(columnId).getKind(), streams.get(new + StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false); } @Override @@ -555,22 +606,33 @@ void skipRows(long items) throws IOException { } private static class TimestampTreeReader extends TreeReader{ - private RunLengthIntegerReader data; - private RunLengthIntegerReader nanos; + private IntegerReader data = null; + private IntegerReader nanos = null; TimestampTreeReader(Path path, int columnId) { super(path, columnId); } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - data = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.DATA)), true); - nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.SECONDARY)), false); + data = createIntegerReader(encodings.get(columnId).getKind(), + streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.DATA)), true); + nanos = createIntegerReader(encodings.get(columnId).getKind(), + streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.SECONDARY)), false); } @Override @@ -625,20 +687,29 @@ void skipRows(long items) throws IOException { } private static class DateTreeReader extends TreeReader{ - private RunLengthIntegerReader reader = null; + private IntegerReader reader = null; DateTreeReader(Path path, int columnId) { super(path, columnId); } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true); } @Override @@ -670,20 +741,29 @@ void skipRows(long items) throws IOException { private static class DecimalTreeReader extends TreeReader{ private InStream valueStream; - private RunLengthIntegerReader scaleStream; + private IntegerReader scaleStream = null; DecimalTreeReader(Path path, int columnId) { super(path, columnId); } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); valueStream = streams.get(new StreamName(columnId, OrcProto.Stream.Kind.DATA)); - scaleStream = new RunLengthIntegerReader(streams.get( + scaleStream = createIntegerReader(encodings.get(columnId).getKind(), streams.get( new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true); } @@ -726,12 +806,9 @@ void skipRows(long items) throws IOException { super(path, columnId); } + @Override void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { - if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY && - encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) { - throw new IOException("Unknown encoding " + encoding + " in column " + - columnId + " of " + path); - } + reader.checkEncoding(encoding); } @Override @@ -742,9 +819,11 @@ void startStripe(Map streams, // reader switch (encodings.get(columnId).getKind()) { case DIRECT: + case DIRECT_V2: reader = new StringDirectTreeReader(path, columnId); break; case DICTIONARY: + case DICTIONARY_V2: reader = new StringDictionaryTreeReader(path, columnId); break; default: @@ -776,7 +855,7 @@ void skipRows(long items) throws IOException { */ private static class StringDirectTreeReader extends TreeReader { private InStream stream; - private RunLengthIntegerReader lengths; + private IntegerReader lengths; StringDirectTreeReader(Path path, int columnId) { super(path, columnId); @@ -784,7 +863,11 @@ void skipRows(long items) throws IOException { @Override void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { - // PASS + if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT && + encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } } @Override @@ -795,8 +878,8 @@ void startStripe(Map streams, StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); stream = streams.get(name); - lengths = new RunLengthIntegerReader(streams.get(new - StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), + lengths = createIntegerReader(encodings.get(columnId).getKind(), + streams.get(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false); } @@ -851,7 +934,7 @@ void skipRows(long items) throws IOException { private static class StringDictionaryTreeReader extends TreeReader { private DynamicByteArray dictionaryBuffer; private int[] dictionaryOffsets; - private RunLengthIntegerReader reader; + private IntegerReader reader; StringDictionaryTreeReader(Path path, int columnId) { super(path, columnId); @@ -859,7 +942,11 @@ void skipRows(long items) throws IOException { @Override void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { - // PASS + if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY && + encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } } @Override @@ -884,7 +971,8 @@ void startStripe(Map streams, // read the lengths name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH); in = streams.get(name); - RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false); + IntegerReader lenReader = createIntegerReader(encodings.get(columnId) + .getKind(), in, false); int offset = 0; if (dictionaryOffsets == null || dictionaryOffsets.length < dictionarySize + 1) { @@ -899,7 +987,8 @@ void startStripe(Map streams, // set up the row reader name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), false); + reader = createIntegerReader(encodings.get(columnId).getKind(), + streams.get(name), false); } @Override @@ -1097,7 +1186,7 @@ void skipRows(long items) throws IOException { private static class ListTreeReader extends TreeReader { private final TreeReader elementReader; - private RunLengthIntegerReader lengths; + private IntegerReader lengths = null; ListTreeReader(Path path, int columnId, List types, @@ -1146,12 +1235,22 @@ Object next(Object previous) throws IOException { } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.LENGTH)), false); + lengths = createIntegerReader(encodings.get(columnId).getKind(), + streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); if (elementReader != null) { elementReader.startStripe(streams, encodings); } @@ -1171,7 +1270,7 @@ void skipRows(long items) throws IOException { private static class MapTreeReader extends TreeReader { private final TreeReader keyReader; private final TreeReader valueReader; - private RunLengthIntegerReader lengths; + private IntegerReader lengths = null; MapTreeReader(Path path, int columnId, @@ -1224,12 +1323,22 @@ Object next(Object previous) throws IOException { } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.LENGTH)), false); + lengths = createIntegerReader(encodings.get(columnId).getKind(), + streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); if (keyReader != null) { keyReader.startStripe(streams, encodings); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java index 2825c64..b737b0f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java @@ -23,7 +23,7 @@ /** * A reader that reads a sequence of integers. * */ -class RunLengthIntegerReader { +class RunLengthIntegerReader implements IntegerReader { private final InStream input; private final boolean signed; private final long[] literals = @@ -71,11 +71,13 @@ private void readValues() throws IOException { } } - boolean hasNext() throws IOException { + @Override + public boolean hasNext() throws IOException { return used != numLiterals || input.available() > 0; } - long next() throws IOException { + @Override + public long next() throws IOException { long result; if (used == numLiterals) { readValues(); @@ -88,7 +90,8 @@ long next() throws IOException { return result; } - void seek(PositionProvider index) throws IOException { + @Override + public void seek(PositionProvider index) throws IOException { input.seek(index); int consumed = (int) index.getNext(); if (consumed != 0) { @@ -104,7 +107,8 @@ void seek(PositionProvider index) throws IOException { } } - void skip(long numValues) throws IOException { + @Override + public void skip(long numValues) throws IOException { while (numValues > 0) { if (used == numLiterals) { readValues(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java new file mode 100644 index 0000000..4a4165f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.EOFException; +import java.io.IOException; + +import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType; + +/** + * A reader that reads a sequence of light weight compressed integers. Refer + * {@link RunLengthIntegerWriterV2} for description of various lightweight + * compression techniques. + */ +class RunLengthIntegerReaderV2 implements IntegerReader { + private final InStream input; + private final boolean signed; + private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE]; + private int numLiterals = 0; + private int used = 0; + + RunLengthIntegerReaderV2(InStream input, boolean signed) throws IOException { + this.input = input; + this.signed = signed; + } + + private void readValues() throws IOException { + // read the first 2 bits and determine the encoding type + int firstByte = input.read(); + if (firstByte < 0) { + throw new EOFException("Read past end of RLE integer from " + input); + } else { + int enc = (firstByte >>> 6) & 0x03; + if (EncodingType.SHORT_REPEAT.ordinal() == enc) { + readShortRepeatValues(firstByte); + } else if (EncodingType.DIRECT.ordinal() == enc) { + readDirectValues(firstByte); + } else if (EncodingType.PATCHED_BASE.ordinal() == enc) { + readPatchedBaseValues(firstByte); + } else { + readDeltaValues(firstByte); + } + } + } + + private void readDeltaValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fb = (firstByte >>> 1) & 0x1f; + if (fb != 0) { + fb = SerializationUtils.decodeBitWidth(fb); + } + + // extract the blob run length + int len = (firstByte & 0x01) << 8; + len |= input.read(); + + // read the first value stored as vint + long firstVal = 0; + if (signed) { + firstVal = SerializationUtils.readVslong(input); + } else { + firstVal = SerializationUtils.readVulong(input); + } + + // store first value to result buffer + long prevVal = firstVal; + literals[numLiterals++] = firstVal; + + // if fixed bits is 0 then all values have fixed delta + if (fb == 0) { + // read the fixed delta value stored as vint (deltas can be negative even + // if all number are positive) + long fd = SerializationUtils.readVslong(input); + + // add fixed deltas to adjacent values + for(int i = 0; i < len; i++) { + literals[numLiterals++] = literals[numLiterals - 2] + fd; + } + } else { + long deltaBase = SerializationUtils.readVslong(input); + // add delta base and first value + literals[numLiterals++] = firstVal + deltaBase; + prevVal = literals[numLiterals - 1]; + len -= 1; + + // write the unpacked values, add it to previous value and store final + // value to result buffer. if the delta base value is negative then it + // is a decreasing sequence else an increasing sequence + SerializationUtils.readInts(literals, numLiterals, len, fb, input); + while (len > 0) { + if (deltaBase < 0) { + literals[numLiterals] = prevVal - literals[numLiterals]; + } else { + literals[numLiterals] = prevVal + literals[numLiterals]; + } + prevVal = literals[numLiterals]; + len--; + numLiterals++; + } + } + } + + private void readPatchedBaseValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fbo = (firstByte >>> 1) & 0x1f; + int fb = SerializationUtils.decodeBitWidth(fbo); + + // extract the run length of data blob + int len = (firstByte & 0x01) << 8; + len |= input.read(); + // runs are always one off + len += 1; + + // extract the number of bytes occupied by base + int thirdByte = input.read(); + int bw = (thirdByte >>> 5) & 0x07; + // base width is one off + bw += 1; + + // extract patch width + int pwo = thirdByte & 0x1f; + int pw = SerializationUtils.decodeBitWidth(pwo); + + // read fourth byte and extract patch gap width + int fourthByte = input.read(); + int pgw = (fourthByte >>> 5) & 0x07; + // patch gap width is one off + pgw += 1; + + // extract the length of the patch list + int pl = fourthByte & 0x1f; + + // read the next base width number of bytes to extract base value + long base = SerializationUtils.bytesToLongBE(input, bw); + long mask = (1L << ((bw * 8) - 1)); + // if MSB of base value is 1 then base is negative value else positive + if ((base & mask) != 0) { + base = base & ~mask; + base = -base; + } + + // unpack the data blob + long[] unpacked = new long[len]; + SerializationUtils.readInts(unpacked, 0, len, fb, input); + + // unpack the patch blob + long[] unpackedPatch = new long[pl]; + SerializationUtils.readInts(unpackedPatch, 0, pl, pw + pgw, input); + + // apply the patch directly when decoding the packed data + int patchIdx = 0; + long currGap = 0; + long currPatch = 0; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + long actualGap = 0; + + // special case: gap is >255 then patch value will be 0. + // if gap is <=255 then patch value cannot be 0 + while (currGap == 255 && currPatch == 0) { + actualGap += 255; + patchIdx++; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + } + // add the left over gap + actualGap += currGap; + + // unpack data blob, patch it (if required), add base to get final result + for(int i = 0; i < unpacked.length; i++) { + if (i == actualGap) { + // extract the patch value + long patchedVal = unpacked[i] | (currPatch << fb); + + // add base to patched value + literals[numLiterals++] = base + patchedVal; + + // increment the patch to point to next entry in patch list + patchIdx++; + + if (patchIdx < pl) { + // read the next gap and patch + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + actualGap = 0; + + // special case: gap is >255 then patch will be 0. if gap is + // <=255 then patch cannot be 0 + while (currGap == 255 && currPatch == 0) { + actualGap += 255; + patchIdx++; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + } + // add the left over gap + actualGap += currGap; + + // next gap is relative to the current gap + actualGap += i; + } + } else { + // no patching required. add base to unpacked value to get final value + literals[numLiterals++] = base + unpacked[i]; + } + } + + } + + private void readDirectValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fbo = (firstByte >>> 1) & 0x1f; + int fb = SerializationUtils.decodeBitWidth(fbo); + + // extract the run length + int len = (firstByte & 0x01) << 8; + len |= input.read(); + // runs are one off + len += 1; + + // write the unpacked values and zigzag decode to result buffer + SerializationUtils.readInts(literals, numLiterals, len, fb, input); + if (signed) { + for(int i = 0; i < len; i++) { + literals[numLiterals] = SerializationUtils + .zigzagDecode(literals[numLiterals]); + numLiterals++; + } + } else { + numLiterals += len; + } + } + + private void readShortRepeatValues(int firstByte) throws IOException { + + // read the number of bytes occupied by the value + int size = (firstByte >>> 3) & 0x07; + // #bytes are one off + size += 1; + + // read the run length + int len = firstByte & 0x07; + // run lengths values are stored only after MIN_REPEAT value is met + len += RunLengthIntegerWriterV2.MIN_REPEAT; + + // read the repeated value which is store using fixed bytes + long val = SerializationUtils.bytesToLongBE(input, size); + + if (signed) { + val = SerializationUtils.zigzagDecode(val); + } + + // repeat the value for length times + for(int i = 0; i < len; i++) { + literals[numLiterals++] = val; + } + } + + @Override + public boolean hasNext() throws IOException { + return used != numLiterals || input.available() > 0; + } + + @Override + public long next() throws IOException { + long result; + if (used == numLiterals) { + numLiterals = 0; + used = 0; + readValues(); + } + result = literals[used++]; + return result; + } + + @Override + public void seek(PositionProvider index) throws IOException { + input.seek(index); + int consumed = (int) index.getNext(); + if (consumed != 0) { + // a loop is required for cases where we break the run into two + // parts + while (consumed > 0) { + numLiterals = 0; + readValues(); + used = consumed; + consumed -= numLiterals; + } + } else { + used = 0; + numLiterals = 0; + } + } + + @Override + public void skip(long numValues) throws IOException { + while (numValues > 0) { + if (used == numLiterals) { + numLiterals = 0; + used = 0; + readValues(); + } + long consume = Math.min(numValues, numLiterals - used); + used += consume; + numValues -= consume; + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java index aaca0a1..539f8df 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java @@ -25,7 +25,7 @@ * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128 * literal vint values follow. */ -class RunLengthIntegerWriter { +class RunLengthIntegerWriter implements IntegerWriter { static final int MIN_REPEAT_SIZE = 3; static final int MAX_DELTA = 127; static final int MIN_DELTA = -128; @@ -71,12 +71,14 @@ private void writeValues() throws IOException { } } - void flush() throws IOException { + @Override + public void flush() throws IOException { writeValues(); output.flush(); } - void write(long value) throws IOException { + @Override + public void write(long value) throws IOException { if (numLiterals == 0) { literals[numLiterals++] = value; tailRunLength = 1; @@ -130,8 +132,10 @@ void write(long value) throws IOException { } } - void getPosition(PositionRecorder recorder) throws IOException { + @Override + public void getPosition(PositionRecorder recorder) throws IOException { output.getPosition(recorder); recorder.addPosition(numLiterals); } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java new file mode 100644 index 0000000..a4497b3 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java @@ -0,0 +1,817 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; + +/** + * A writer that performs light weight compression over sequence of integers. + *

+ * There are four types of lightweight integer compression + *

    + *
  • SHORT_REPEAT
  • + *
  • DIRECT
  • + *
  • PATCHED_BASE
  • + *
  • DELTA
  • + *
+ *

+ * The description and format for these types are as below: + *

+ * SHORT_REPEAT: Used for short repeated integer sequences. + *

    + *
  • 1 byte header + *
      + *
    • 2 bits for encoding type
    • + *
    • 3 bits for bytes required for repeating value
    • + *
    • 3 bits for repeat count (MIN_REPEAT + run length)
    • + *
    + *
  • + *
  • Blob - repeat value (fixed bytes)
  • + *
+ *

+ *

+ * DIRECT: Used for random integer sequences whose number of bit + * requirement doesn't vary a lot. + *

    + *
  • 2 bytes header + *
      + * 1st byte + *
    • 2 bits for encoding type
    • + *
    • 5 bits for fixed bit width of values in blob
    • + *
    • 1 bit for storing MSB of run length
    • + *
    + *
      + * 2nd byte + *
    • 8 bits for lower run length bits
    • + *
    + *
  • + *
  • Blob - stores the direct values using fixed bit width. The length of the + * data blob is (fixed width * run length) bits long
  • + *
+ *

+ *

+ * PATCHED_BASE: Used for random integer sequences whose number of bit + * requirement varies beyond a threshold. + *

    + *
  • 4 bytes header + *
      + * 1st byte + *
    • 2 bits for encoding type
    • + *
    • 5 bits for fixed bit width of values in blob
    • + *
    • 1 bit for storing MSB of run length
    • + *
    + *
      + * 2nd byte + *
    • 8 bits for lower run length bits
    • + *
    + *
      + * 3rd byte + *
    • 3 bits for bytes required to encode base value
    • + *
    • 5 bits for patch width
    • + *
    + *
      + * 4th byte + *
    • 3 bits for patch gap width
    • + *
    • 5 bits for patch length
    • + *
    + *
  • + *
  • Base value - Stored using fixed number of bytes. If MSB is set, base + * value is negative else positive. Length of base value is (base width * 8) + * bits.
  • + *
  • Data blob - Base reduced values as stored using fixed bit width. Length + * of data blob is (fixed width * run length) bits.
  • + *
  • Patch blob - Patch blob is a list of gap and patch value. Each entry in + * the patch list is (patch width + patch gap width) bits long. Gap between the + * subsequent elements to be patched are stored in upper part of entry whereas + * patch values are stored in lower part of entry. Length of patch blob is + * ((patch width + patch gap width) * patch length) bits.
  • + *
+ *

+ *

+ * DELTA Used for monotonically increasing or decreasing sequences, + * sequences with fixed delta values or long repeated sequences. + *

    + *
  • 2 bytes header + *
      + * 1st byte + *
    • 2 bits for encoding type
    • + *
    • 5 bits for fixed bit width of values in blob
    • + *
    • 1 bit for storing MSB of run length
    • + *
    + *
      + * 2nd byte + *
    • 8 bits for lower run length bits
    • + *
    + *
  • + *
  • Base value - encoded as varint
  • + *
  • Delta base - encoded as varint
  • + *
  • Delta blob - only positive values. monotonicity and orderness are decided + * based on the sign of the base value and delta base
  • + *
+ *

+ */ +class RunLengthIntegerWriterV2 implements IntegerWriter { + + public enum EncodingType { + SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA + } + + static final int MAX_SCOPE = 512; + static final int MIN_REPEAT = 3; + private static final int MAX_SHORT_REPEAT_LENGTH = 10; + private long prevDelta = 0; + private int fixedRunLength = 0; + private int variableRunLength = 0; + private final long[] literals = new long[MAX_SCOPE]; + private final PositionedOutputStream output; + private final boolean signed; + private EncodingType encoding; + private int numLiterals; + private long[] zigzagLiterals; + private long[] baseRedLiterals; + private long[] adjDeltas; + private long fixedDelta; + private int zzBits90p; + private int zzBits100p; + private int brBits95p; + private int brBits100p; + private int bitsDeltaMax; + private int patchWidth; + private int patchGapWidth; + private int patchLength; + private long[] gapVsPatchList; + private long min; + private boolean isFixedDelta; + + RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) { + this.output = output; + this.signed = signed; + clear(); + } + + private void writeValues() throws IOException { + if (numLiterals != 0) { + + if (encoding.equals(EncodingType.SHORT_REPEAT)) { + writeShortRepeatValues(); + } else if (encoding.equals(EncodingType.DIRECT)) { + writeDirectValues(); + } else if (encoding.equals(EncodingType.PATCHED_BASE)) { + writePatchedBaseValues(); + } else { + writeDeltaValues(); + } + + // clear all the variables + clear(); + } + } + + private void writeDeltaValues() throws IOException { + int len = 0; + int fb = bitsDeltaMax; + int efb = 0; + + if (isFixedDelta) { + // if fixed run length is greater than threshold then it will be fixed + // delta sequence with delta value 0 else fixed delta sequence with + // non-zero delta value + if (fixedRunLength > MIN_REPEAT) { + // ex. sequence: 2 2 2 2 2 2 2 2 + len = fixedRunLength - 1; + fixedRunLength = 0; + } else { + // ex. sequence: 4 6 8 10 12 14 16 + len = variableRunLength - 1; + variableRunLength = 0; + } + } else { + // fixed width 0 is used for long repeating values. + // sequences that require only 1 bit to encode will have an additional bit + if (fb == 1) { + fb = 2; + } + efb = SerializationUtils.encodeBitWidth(fb); + efb = efb << 1; + len = variableRunLength - 1; + variableRunLength = 0; + } + + // extract the 9th bit of run length + int tailBits = (len & 0x100) >>> 8; + + // create first byte of the header + int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + int headerSecondByte = len & 0xff; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + + // store the first value from zigzag literal array + if (signed) { + SerializationUtils.writeVslong(output, literals[0]); + } else { + SerializationUtils.writeVulong(output, literals[0]); + } + + if (isFixedDelta) { + // if delta is fixed then we don't need to store delta blob + SerializationUtils.writeVslong(output, fixedDelta); + } else { + // store the first value as delta value using zigzag encoding + SerializationUtils.writeVslong(output, adjDeltas[0]); + // adjacent delta values are bit packed + SerializationUtils.writeInts(adjDeltas, 1, adjDeltas.length - 1, fb, + output); + } + } + + private void writePatchedBaseValues() throws IOException { + + // write the number of fixed bits required in next 5 bits + int fb = brBits95p; + int efb = SerializationUtils.encodeBitWidth(fb) << 1; + + // adjust variable run length, they are one off + variableRunLength -= 1; + + // extract the 9th bit of run length + int tailBits = (variableRunLength & 0x100) >>> 8; + + // create first byte of the header + int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + int headerSecondByte = variableRunLength & 0xff; + + // if the min value is negative toggle the sign + boolean isNegative = min < 0 ? true : false; + if (isNegative) { + min = -min; + } + + // find the number of bytes required for base and shift it by 5 bits + // to accommodate patch width. The additional bit is used to store the sign + // of the base value. + int baseWidth = SerializationUtils.findClosestNumBits(min) + 1; + int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1; + int bb = (baseBytes - 1) << 5; + + // if the base value is negative then set MSB to 1 + if (isNegative) { + min |= (1L << ((baseBytes * 8) - 1)); + } + + // third byte contains 3 bits for number of bytes occupied by base + // and 5 bits for patchWidth + int headerThirdByte = bb | SerializationUtils.encodeBitWidth(patchWidth); + + // fourth byte contains 3 bits for page gap width and 5 bits for + // patch length + int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + output.write(headerThirdByte); + output.write(headerFourthByte); + + // write the base value using fixed bytes in big endian order + for(int i = baseBytes - 1; i >= 0; i--) { + byte b = (byte) ((min >>> (i * 8)) & 0xff); + output.write(b); + } + + // base reduced literals are bit packed + int closestFixedBits = SerializationUtils.getClosestFixedBits(brBits95p); + SerializationUtils.writeInts(baseRedLiterals, 0, baseRedLiterals.length, + closestFixedBits, output); + + // write patch list + closestFixedBits = SerializationUtils.getClosestFixedBits(patchGapWidth + + patchWidth); + SerializationUtils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, + closestFixedBits, output); + + // reset run length + variableRunLength = 0; + } + + /** + * Store the opcode in 2 MSB bits + * @return opcode + */ + private int getOpcode() { + return encoding.ordinal() << 6; + } + + private void writeDirectValues() throws IOException { + + // write the number of fixed bits required in next 5 bits + int efb = SerializationUtils.encodeBitWidth(zzBits100p) << 1; + + // adjust variable run length + variableRunLength -= 1; + + // extract the 9th bit of run length + int tailBits = (variableRunLength & 0x100) >>> 8; + + // create first byte of the header + int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + int headerSecondByte = variableRunLength & 0xff; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + + // bit packing the zigzag encoded literals + SerializationUtils.writeInts(zigzagLiterals, 0, zigzagLiterals.length, + zzBits100p, output); + + // reset run length + variableRunLength = 0; + } + + private void writeShortRepeatValues() throws IOException { + // get the value that is repeating, compute the bits and bytes required + long repeatVal = 0; + if (signed) { + repeatVal = SerializationUtils.zigzagEncode(literals[0]); + } else { + repeatVal = literals[0]; + } + + int numBitsRepeatVal = SerializationUtils.findClosestNumBits(repeatVal); + int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3 + : (numBitsRepeatVal >>> 3) + 1; + + // write encoding type in top 2 bits + int header = getOpcode(); + + // write the number of bytes required for the value + header |= ((numBytesRepeatVal - 1) << 3); + + // write the run length + fixedRunLength -= MIN_REPEAT; + header |= fixedRunLength; + + // write the header + output.write(header); + + // write the repeating value in big endian byte order + for(int i = numBytesRepeatVal - 1; i >= 0; i--) { + int b = (int) ((repeatVal >>> (i * 8)) & 0xff); + output.write(b); + } + + fixedRunLength = 0; + } + + private void determineEncoding() { + // used for direct encoding + zigzagLiterals = new long[numLiterals]; + + // used for patched base encoding + baseRedLiterals = new long[numLiterals]; + + // used for delta encoding + adjDeltas = new long[numLiterals - 1]; + + int idx = 0; + + // for identifying monotonic sequences + boolean isIncreasing = false; + int increasingCount = 1; + boolean isDecreasing = false; + int decreasingCount = 1; + + // for identifying type of delta encoding + min = literals[0]; + long max = literals[0]; + isFixedDelta = true; + long currDelta = 0; + + min = literals[0]; + long deltaMax = 0; + + // populate all variables to identify the encoding type + if (numLiterals >= 1) { + currDelta = literals[1] - literals[0]; + for(int i = 0; i < numLiterals; i++) { + if (i > 0 && literals[i] >= max) { + max = literals[i]; + increasingCount++; + } + + if (i > 0 && literals[i] <= min) { + min = literals[i]; + decreasingCount++; + } + + // if delta doesn't changes then mark it as fixed delta + if (i > 0 && isFixedDelta) { + if (literals[i] - literals[i - 1] != currDelta) { + isFixedDelta = false; + } + + fixedDelta = currDelta; + } + + // populate zigzag encoded literals + long zzEncVal = 0; + if (signed) { + zzEncVal = SerializationUtils.zigzagEncode(literals[i]); + } else { + zzEncVal = literals[i]; + } + zigzagLiterals[idx] = zzEncVal; + idx++; + + // max delta value is required for computing the fixed bits + // required for delta blob in delta encoding + if (i > 0) { + if (i == 1) { + // first value preserve the sign + adjDeltas[i - 1] = literals[i] - literals[i - 1]; + } else { + adjDeltas[i - 1] = Math.abs(literals[i] - literals[i - 1]); + if (adjDeltas[i - 1] > deltaMax) { + deltaMax = adjDeltas[i - 1]; + } + } + } + } + + // stores the number of bits required for packing delta blob in + // delta encoding + bitsDeltaMax = SerializationUtils.findClosestNumBits(deltaMax); + + // if decreasing count equals total number of literals then the + // sequence is monotonically decreasing + if (increasingCount == 1 && decreasingCount == numLiterals) { + isDecreasing = true; + } + + // if increasing count equals total number of literals then the + // sequence is monotonically increasing + if (decreasingCount == 1 && increasingCount == numLiterals) { + isIncreasing = true; + } + } + + // if the sequence is both increasing and decreasing then it is not + // monotonic + if (isDecreasing && isIncreasing) { + isDecreasing = false; + isIncreasing = false; + } + + // fixed delta condition + if (isIncreasing == false && isDecreasing == false && isFixedDelta == true) { + encoding = EncodingType.DELTA; + return; + } + + // monotonic condition + if (isIncreasing || isDecreasing) { + encoding = EncodingType.DELTA; + return; + } + + // percentile values are computed for the zigzag encoded values. if the + // number of bit requirement between 90th and 100th percentile varies + // beyond a threshold then we need to patch the values. if the variation + // is not significant then we can use direct or delta encoding + + double p = 0.9; + zzBits90p = SerializationUtils.percentileBits(zigzagLiterals, p); + + p = 1.0; + zzBits100p = SerializationUtils.percentileBits(zigzagLiterals, p); + + int diffBitsLH = zzBits100p - zzBits90p; + + // if the difference between 90th percentile and 100th percentile fixed + // bits is > 1 then we need patch the values + if (isIncreasing == false && isDecreasing == false && diffBitsLH > 1 + && isFixedDelta == false) { + // patching is done only on base reduced values. + // remove base from literals + for(int i = 0; i < zigzagLiterals.length; i++) { + baseRedLiterals[i] = literals[i] - min; + } + + // 95th percentile width is used to determine max allowed value + // after which patching will be done + p = 0.95; + brBits95p = SerializationUtils.percentileBits(baseRedLiterals, p); + + // 100th percentile is used to compute the max patch width + p = 1.0; + brBits100p = SerializationUtils.percentileBits(baseRedLiterals, p); + + // after base reducing the values, if the difference in bits between + // 95th percentile and 100th percentile value is zero then there + // is no point in patching the values, in which case we will + // fallback to DIRECT encoding. + // The decision to use patched base was based on zigzag values, but the + // actual patching is done on base reduced literals. + if ((brBits100p - brBits95p) != 0) { + encoding = EncodingType.PATCHED_BASE; + preparePatchedBlob(); + return; + } else { + encoding = EncodingType.DIRECT; + return; + } + } + + // if difference in bits between 95th percentile and 100th percentile is + // 0, then patch length will become 0. Hence we will fallback to direct + if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1 + && isFixedDelta == false) { + encoding = EncodingType.DIRECT; + return; + } + + // this should not happen + if (encoding == null) { + throw new RuntimeException("Integer encoding cannot be determined."); + } + } + + private void preparePatchedBlob() { + // mask will be max value beyond which patch will be generated + int mask = (1 << brBits95p) - 1; + + // since we are considering only 95 percentile, the size of gap and + // patch array can contain only be 5% values + patchLength = (int) Math.ceil((baseRedLiterals.length * 0.05)); + int[] gapList = new int[patchLength]; + long[] patchList = new long[patchLength]; + + // #bit for patch + patchWidth = brBits100p - brBits95p; + patchWidth = SerializationUtils.getClosestFixedBits(patchWidth); + + int gapIdx = 0; + int patchIdx = 0; + int prev = 0; + int gap = 0; + int maxGap = 0; + + for(int i = 0; i < baseRedLiterals.length; i++) { + // if value is above mask then create the patch and record the gap + if (baseRedLiterals[i] > mask) { + gap = i - prev; + if (gap > maxGap) { + maxGap = gap; + } + + // gaps are relative, so store the previous patched value index + prev = i; + gapList[gapIdx++] = gap; + + // extract the most significant bits that are over mask bits + long patch = baseRedLiterals[i] >>> brBits95p; + patchList[patchIdx++] = patch; + + // strip off the MSB to enable safe bit packing + baseRedLiterals[i] &= mask; + } + } + + // adjust the patch length to number of entries in gap list + patchLength = gapIdx; + + // if the element to be patched is the first and only element then + // max gap will be 0, but to store the gap as 0 we need atleast 1 bit + if (maxGap == 0 && patchLength != 0) { + patchGapWidth = 1; + } else { + patchGapWidth = SerializationUtils.findClosestNumBits(maxGap); + } + + // special case: if the patch gap width is greater than 256, then + // we need 9 bits to encode the gap width. But we only have 3 bits in + // header to record the gap width. To deal with this case, we will save + // two entries in patch list in the following way + // 256 gap width => 0 for patch value + // actual gap - 256 => actual patch value + // We will do the same for gap width = 511. If the element to be patched is + // the last element in the scope then gap width will be 511. In this case we + // will have 3 entries in the patch list in the following way + // 255 gap width => 0 for patch value + // 255 gap width => 0 for patch value + // 1 gap width => actual patch value + if (patchGapWidth > 8) { + patchGapWidth = 8; + // for gap = 511, we need two additional entries in patch list + if (maxGap == 511) { + patchLength += 2; + } else { + patchLength += 1; + } + } + + // create gap vs patch list + gapIdx = 0; + patchIdx = 0; + gapVsPatchList = new long[patchLength]; + for(int i = 0; i < patchLength; i++) { + long g = gapList[gapIdx++]; + long p = patchList[patchIdx++]; + while (g > 255) { + gapVsPatchList[i++] = (255 << patchWidth) | 0; + g -= 255; + } + + // store patch value in LSBs and gap in MSBs + gapVsPatchList[i] = (g << patchWidth) | p; + } + } + + /** + * clears all the variables + */ + private void clear() { + numLiterals = 0; + encoding = null; + prevDelta = 0; + zigzagLiterals = null; + baseRedLiterals = null; + adjDeltas = null; + fixedDelta = 0; + zzBits90p = 0; + zzBits100p = 0; + brBits95p = 0; + brBits100p = 0; + bitsDeltaMax = 0; + patchGapWidth = 0; + patchLength = 0; + patchWidth = 0; + gapVsPatchList = null; + min = 0; + isFixedDelta = false; + } + + @Override + public void flush() throws IOException { + if (numLiterals != 0) { + if (variableRunLength != 0) { + determineEncoding(); + writeValues(); + } else if (fixedRunLength != 0) { + if (fixedRunLength < MIN_REPEAT) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + determineEncoding(); + writeValues(); + } else if (fixedRunLength >= MIN_REPEAT + && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { + encoding = EncodingType.SHORT_REPEAT; + writeValues(); + } else { + encoding = EncodingType.DELTA; + isFixedDelta = true; + writeValues(); + } + } + } + output.flush(); + } + + @Override + public void write(long val) throws IOException { + if (numLiterals == 0) { + initializeLiterals(val); + } else { + if (numLiterals == 1) { + prevDelta = val - literals[0]; + literals[numLiterals++] = val; + // if both values are same count as fixed run else variable run + if (val == literals[0]) { + fixedRunLength = 2; + variableRunLength = 0; + } else { + fixedRunLength = 0; + variableRunLength = 2; + } + } else { + long currentDelta = val - literals[numLiterals - 1]; + if (prevDelta == 0 && currentDelta == 0) { + // fixed delta run + + literals[numLiterals++] = val; + + // if variable run is non-zero then we are seeing repeating + // values at the end of variable run in which case keep + // updating variable and fixed runs + if (variableRunLength > 0) { + fixedRunLength = 2; + } + fixedRunLength += 1; + + // if fixed run met the minimum condition and if variable + // run is non-zero then flush the variable run and shift the + // tail fixed runs to start of the buffer + if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) { + numLiterals -= MIN_REPEAT; + variableRunLength -= MIN_REPEAT - 1; + // copy the tail fixed runs + long[] tailVals = new long[MIN_REPEAT]; + System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT); + + // determine variable encoding and flush values + determineEncoding(); + writeValues(); + + // shift tail fixed runs to beginning of the buffer + for(long l : tailVals) { + literals[numLiterals++] = l; + } + } + + // if fixed runs reached max repeat length then write values + if (fixedRunLength == MAX_SCOPE) { + determineEncoding(); + writeValues(); + } + } else { + // variable delta run + + // if fixed run length is non-zero and if it satisfies the + // short repeat conditions then write the values as short repeats + // else use delta encoding + if (fixedRunLength >= MIN_REPEAT) { + if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { + encoding = EncodingType.SHORT_REPEAT; + writeValues(); + } else { + encoding = EncodingType.DELTA; + isFixedDelta = true; + writeValues(); + } + } + + // if fixed run length is 0 && fixedRunLength < MIN_REPEAT) { + if (val != literals[numLiterals - 1]) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + } + } + + // after writing values re-initialize the variables + if (numLiterals == 0) { + initializeLiterals(val); + } else { + // keep updating variable run lengths + prevDelta = val - literals[numLiterals - 1]; + literals[numLiterals++] = val; + variableRunLength += 1; + + // if variable run length reach the max scope, write it + if (variableRunLength == MAX_SCOPE) { + determineEncoding(); + writeValues(); + } + } + } + } + } + } + + private void initializeLiterals(long val) { + literals[numLiterals++] = val; + fixedRunLength = 1; + variableRunLength = 1; + } + + @Override + public void getPosition(PositionRecorder recorder) throws IOException { + output.getPosition(recorder); + recorder.addPosition(numLiterals); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java index 67762b5..1cd84f9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java @@ -185,4 +185,279 @@ static BigInteger readBigInteger(InputStream input) throws IOException { result = result.shiftRight(1); return result; } + + enum FixedBitSizes { + ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE, + THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN, + TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX, + TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR; + } + + /** + * Count the number of bits required to encode the given value + * @param value + * @return bits required to store value + */ + static int findClosestNumBits(long value) { + int count = 0; + while (value > 0) { + count++; + value = value >>> 1; + } + return getClosestFixedBits(count); + } + + /** + * zigzag encode the given value + * @param val + * @return zigzag encoded value + */ + static long zigzagEncode(long val) { + return (val << 1) ^ (val >> 63); + } + + /** + * zigzag decode the given value + * @param val + * @return zizag decoded value + */ + static long zigzagDecode(long val) { + return (val >>> 1) ^ -(val & 1); + } + + /** + * Compute the bits required to represent pth percentile value + * @param data - array + * @param p - percentile value (>=0.0 to <=1.0) + * @return pth percentile bits + */ + static int percentileBits(long[] data, double p) { + if ((p > 1.0) || (p <= 0.0)) { + return -1; + } + + // histogram that store the encoded bit requirement for each values. + // maximum number of bits that can encoded is 32 (refer FixedBitSizes) + int[] hist = new int[32]; + + // compute the histogram + for(long l : data) { + int idx = encodeBitWidth(findClosestNumBits(l)); + hist[idx] += 1; + } + + int len = data.length; + int perLen = (int) (len * (1.0 - p)); + + // return the bits required by pth percentile length + for(int i = hist.length - 1; i >= 0; i--) { + perLen -= hist[i]; + if (perLen < 0) { + return decodeBitWidth(i); + } + } + + return 0; + } + + /** + * Read n bytes in big endian order and convert to long + * @param b - byte array + * @return long value + */ + static long bytesToLongBE(InStream input, int n) throws IOException { + long out = 0; + long val = 0; + while (n > 0) { + n--; + // store it in a long and then shift else integer overflow will occur + val = input.read(); + out |= (val << (n * 8)); + } + return out; + } + + /** + * Calculate the number of bytes required + * @param n - number of values + * @param numBits - bit width + * @return number of bytes required + */ + static int getTotalBytesRequired(int n, int numBits) { + return (n * numBits + 7) / 8; + } + + /** + * For a given fixed bit this function will return the closest available fixed + * bit + * @param n + * @return closest valid fixed bit + */ + static int getClosestFixedBits(int n) { + if (n == 0) { + return 1; + } + + if (n >= 1 && n <= 24) { + return n; + } else if (n > 24 && n <= 26) { + return 26; + } else if (n > 26 && n <= 28) { + return 28; + } else if (n > 28 && n <= 30) { + return 30; + } else if (n > 30 && n <= 32) { + return 32; + } else if (n > 32 && n <= 40) { + return 40; + } else if (n > 40 && n <= 48) { + return 48; + } else if (n > 48 && n <= 56) { + return 56; + } else { + return 64; + } + } + + /** + * Finds the closest available fixed bit width match and returns its encoded + * value (ordinal) + * @param n - fixed bit width to encode + * @return encoded fixed bit width + */ + static int encodeBitWidth(int n) { + n = getClosestFixedBits(n); + + if (n >= 1 && n <= 24) { + return n - 1; + } else if (n > 24 && n <= 26) { + return FixedBitSizes.TWENTYSIX.ordinal(); + } else if (n > 26 && n <= 28) { + return FixedBitSizes.TWENTYEIGHT.ordinal(); + } else if (n > 28 && n <= 30) { + return FixedBitSizes.THIRTY.ordinal(); + } else if (n > 30 && n <= 32) { + return FixedBitSizes.THIRTYTWO.ordinal(); + } else if (n > 32 && n <= 40) { + return FixedBitSizes.FORTY.ordinal(); + } else if (n > 40 && n <= 48) { + return FixedBitSizes.FORTYEIGHT.ordinal(); + } else if (n > 48 && n <= 56) { + return FixedBitSizes.FIFTYSIX.ordinal(); + } else { + return FixedBitSizes.SIXTYFOUR.ordinal(); + } + } + + /** + * Decodes the ordinal fixed bit value to actual fixed bit width value + * @param n - encoded fixed bit width + * @return decoded fixed bit width + */ + static int decodeBitWidth(int n) { + if (n >= FixedBitSizes.ONE.ordinal() + && n <= FixedBitSizes.TWENTYFOUR.ordinal()) { + return n + 1; + } else if (n == FixedBitSizes.TWENTYSIX.ordinal()) { + return 26; + } else if (n == FixedBitSizes.TWENTYEIGHT.ordinal()) { + return 28; + } else if (n == FixedBitSizes.THIRTY.ordinal()) { + return 30; + } else if (n == FixedBitSizes.THIRTYTWO.ordinal()) { + return 32; + } else if (n == FixedBitSizes.FORTY.ordinal()) { + return 40; + } else if (n == FixedBitSizes.FORTYEIGHT.ordinal()) { + return 48; + } else if (n == FixedBitSizes.FIFTYSIX.ordinal()) { + return 56; + } else { + return 64; + } + } + + /** + * Bitpack and write the input values to underlying output stream + * @param input - values to write + * @param offset - offset + * @param len - length + * @param bitSize - bit width + * @param output - output stream + * @throws IOException + */ + static void writeInts(long[] input, int offset, int len, int bitSize, + OutputStream output) throws IOException { + if (input == null || input.length < 1 || offset < 0 || len < 1 + || bitSize < 1) { + return; + } + + int bitsLeft = 8; + byte current = 0; + for(int i = offset; i < (offset + len); i++) { + long value = input[i]; + int bitsToWrite = bitSize; + while (bitsToWrite > bitsLeft) { + // add the bits to the bottom of the current word + current |= value >>> (bitsToWrite - bitsLeft); + // subtract out the bits we just added + bitsToWrite -= bitsLeft; + // zero out the bits above bitsToWrite + value &= (1L << bitsToWrite) - 1; + output.write(current); + current = 0; + bitsLeft = 8; + } + bitsLeft -= bitsToWrite; + current |= value << bitsLeft; + if (bitsLeft == 0) { + output.write(current); + current = 0; + bitsLeft = 8; + } + } + + // flush + if (bitsLeft != 8) { + output.write(current); + current = 0; + bitsLeft = 8; + } + } + + /** + * Read bitpacked integers from input stream + * @param buffer - input buffer + * @param offset - offset + * @param len - length + * @param bitSize - bit width + * @param input - input stream + * @throws IOException + */ + static void readInts(long[] buffer, int offset, int len, int bitSize, + InStream input) throws IOException { + int bitsLeft = 0; + int current = 0; + + for(int i = offset; i < (offset + len); i++) { + long result = 0; + int bitsLeftToRead = bitSize; + while (bitsLeftToRead > bitsLeft) { + result <<= bitsLeft; + result |= current & ((1 << bitsLeft) - 1); + bitsLeftToRead -= bitsLeft; + current = input.read(); + bitsLeft = 8; + } + + // handle the left over bits + if (bitsLeftToRead > 0) { + result <<= bitsLeftToRead; + bitsLeft -= bitsLeftToRead; + result |= (current >> bitsLeft) & ((1 << bitsLeftToRead) - 1); + } + buffer[i] = result; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index e2d7e56..c1052c3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -381,6 +381,7 @@ public Configuration getConfiguration() { private final PositionedOutputStream rowIndexStream; private boolean foundNulls; private OutStream isPresentOutStream; + protected final boolean useDirectV2Encoding; /** * Create a tree writer. @@ -396,6 +397,7 @@ public Configuration getConfiguration() { this.isCompressed = streamFactory.isCompressed(); this.id = columnId; this.inspector = inspector; + this.useDirectV2Encoding = true; if (nullable) { isPresentOutStream = streamFactory.createStream(id, OrcProto.Stream.Kind.PRESENT); @@ -430,6 +432,32 @@ protected ColumnStatisticsImpl getFileStatistics() { return rowIndexEntry; } + IntegerWriter createIntegerWriter(PositionedOutputStream output, + boolean signed, boolean isDirectV2) { + if (isDirectV2) { + return new RunLengthIntegerWriterV2(output, signed); + } else { + return new RunLengthIntegerWriter(output, signed); + } + } + + boolean isNewWriteFormat(StreamFactory writer) { + String writeFormat = writer.getConfiguration().get( + HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname); + if (writeFormat == null) { + LOG.warn("ORC write format not defined. Using 0.12 ORC write format."); + return true; + } + if (writeFormat + .equals(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.defaultVal)) { + LOG.info("Using 0.11 ORC write format."); + return false; + } + + LOG.info("Using 0.12 ORC write format."); + return true; + } + /** * Add a new value to the column. * @param obj @@ -635,10 +663,11 @@ void recordPosition(PositionRecorder recorder) throws IOException { } private static class IntegerTreeWriter extends TreeWriter { - private final RunLengthIntegerWriter writer; + private final IntegerWriter writer; private final ShortObjectInspector shortInspector; private final IntObjectInspector intInspector; private final LongObjectInspector longInspector; + private boolean isDirectV2 = true; IntegerTreeWriter(int columnId, ObjectInspector inspector, @@ -647,7 +676,8 @@ void recordPosition(PositionRecorder recorder) throws IOException { super(columnId, inspector, writer, nullable); PositionedOutputStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); - this.writer = new RunLengthIntegerWriter(out, true); + this.isDirectV2 = isNewWriteFormat(writer); + this.writer = createIntegerWriter(out, true, isDirectV2); if (inspector instanceof IntObjectInspector) { intInspector = (IntObjectInspector) inspector; shortInspector = null; @@ -666,6 +696,16 @@ void recordPosition(PositionRecorder recorder) throws IOException { } @Override + OrcProto.ColumnEncoding getEncoding() { + if (isDirectV2) { + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build(); + } + + @Override void write(Object obj) throws IOException { super.write(obj); if (obj != null) { @@ -776,13 +816,13 @@ void recordPosition(PositionRecorder recorder) throws IOException { private static class StringTreeWriter extends TreeWriter { private static final int INITIAL_DICTIONARY_SIZE = 4096; private final OutStream stringOutput; - private final RunLengthIntegerWriter lengthOutput; - private final RunLengthIntegerWriter rowOutput; + private final IntegerWriter lengthOutput; + private final IntegerWriter rowOutput; private final StringRedBlackTree dictionary = new StringRedBlackTree(INITIAL_DICTIONARY_SIZE); private final DynamicIntArray rows = new DynamicIntArray(); private final PositionedOutputStream directStreamOutput; - private final RunLengthIntegerWriter directLengthOutput; + private final IntegerWriter directLengthOutput; private final List savedRowIndex = new ArrayList(); private final boolean buildIndex; @@ -791,25 +831,26 @@ void recordPosition(PositionRecorder recorder) throws IOException { //the total number of non-null rows, turn off dictionary encoding private final float dictionaryKeySizeThreshold; private boolean useDictionaryEncoding = true; + private boolean isDirectV2 = true; StringTreeWriter(int columnId, ObjectInspector inspector, StreamFactory writer, boolean nullable) throws IOException { super(columnId, inspector, writer, nullable); + this.isDirectV2 = isNewWriteFormat(writer); stringOutput = writer.createStream(id, OrcProto.Stream.Kind.DICTIONARY_DATA); - lengthOutput = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.LENGTH), false); - rowOutput = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.DATA), false); + lengthOutput = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false, isDirectV2); + rowOutput = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.DATA), false, isDirectV2); recordPosition(rowIndexPosition); rowIndexValueCount.add(0L); buildIndex = writer.buildIndex(); directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA); - directLengthOutput = - new RunLengthIntegerWriter(writer.createStream - (id, OrcProto.Stream.Kind.LENGTH), false); + directLengthOutput = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false, isDirectV2); dictionaryKeySizeThreshold = writer.getConfiguration().getFloat( HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname, HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD. @@ -906,24 +947,23 @@ public void visit(StringRedBlackTree.VisitorContext context rowIndexValueCount.add(0L); } - // Calls getPosition on the row output stream if dictionary encoding is used, and the direct - // output stream if direct encoding is used - private void recordOutputPosition(OrcProto.RowIndexEntry.Builder base) throws IOException { - if (useDictionaryEncoding) { - rowOutput.getPosition(new RowIndexPositionRecorder(base)); - } else { - directStreamOutput.getPosition(new RowIndexPositionRecorder(base)); - } - } - @Override OrcProto.ColumnEncoding getEncoding() { // Returns the encoding used for the last call to writeStripe if (useDictionaryEncoding) { + if(isDirectV2) { + return OrcProto.ColumnEncoding.newBuilder().setKind( + OrcProto.ColumnEncoding.Kind.DICTIONARY_V2). + setDictionarySize(dictionary.size()).build(); + } return OrcProto.ColumnEncoding.newBuilder().setKind( OrcProto.ColumnEncoding.Kind.DICTIONARY). setDictionarySize(dictionary.size()).build(); } else { + if(isDirectV2) { + return OrcProto.ColumnEncoding.newBuilder().setKind( + OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } return OrcProto.ColumnEncoding.newBuilder().setKind( OrcProto.ColumnEncoding.Kind.DIRECT).build(); } @@ -956,7 +996,8 @@ long estimateMemory() { private static class BinaryTreeWriter extends TreeWriter { private final PositionedOutputStream stream; - private final RunLengthIntegerWriter length; + private final IntegerWriter length; + private boolean isDirectV2 = true; BinaryTreeWriter(int columnId, ObjectInspector inspector, @@ -965,12 +1006,23 @@ long estimateMemory() { super(columnId, inspector, writer, nullable); this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); - this.length = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.LENGTH), false); + this.isDirectV2 = isNewWriteFormat(writer); + this.length = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false, isDirectV2); recordPosition(rowIndexPosition); } @Override + OrcProto.ColumnEncoding getEncoding() { + if (isDirectV2) { + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build(); + } + + @Override void write(Object obj) throws IOException { super.write(obj); if (obj != null) { @@ -1003,22 +1055,34 @@ void recordPosition(PositionRecorder recorder) throws IOException { Timestamp.valueOf("2015-01-01 00:00:00").getTime() / MILLIS_PER_SECOND; private static class TimestampTreeWriter extends TreeWriter { - private final RunLengthIntegerWriter seconds; - private final RunLengthIntegerWriter nanos; + private final IntegerWriter seconds; + private final IntegerWriter nanos; + private final boolean isDirectV2; TimestampTreeWriter(int columnId, ObjectInspector inspector, StreamFactory writer, boolean nullable) throws IOException { super(columnId, inspector, writer, nullable); - this.seconds = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.DATA), true); - this.nanos = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.SECONDARY), false); + this.isDirectV2 = isNewWriteFormat(writer); + this.seconds = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.DATA), true, isDirectV2); + this.nanos = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), false, isDirectV2); recordPosition(rowIndexPosition); } @Override + OrcProto.ColumnEncoding getEncoding() { + if (isDirectV2) { + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build(); + } + + @Override void write(Object obj) throws IOException { super.write(obj); if (obj != null) { @@ -1064,7 +1128,8 @@ void recordPosition(PositionRecorder recorder) throws IOException { } private static class DateTreeWriter extends TreeWriter { - private final RunLengthIntegerWriter writer; + private final IntegerWriter writer; + private final boolean isDirectV2; DateTreeWriter(int columnId, ObjectInspector inspector, @@ -1073,7 +1138,8 @@ void recordPosition(PositionRecorder recorder) throws IOException { super(columnId, inspector, writer, nullable); PositionedOutputStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); - this.writer = new RunLengthIntegerWriter(out, true); + this.isDirectV2 = isNewWriteFormat(writer); + this.writer = createIntegerWriter(out, true, isDirectV2); recordPosition(rowIndexPosition); } @@ -1101,24 +1167,46 @@ void recordPosition(PositionRecorder recorder) throws IOException { super.recordPosition(recorder); writer.getPosition(recorder); } + + @Override + OrcProto.ColumnEncoding getEncoding() { + if (isDirectV2) { + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build(); + } } private static class DecimalTreeWriter extends TreeWriter { private final PositionedOutputStream valueStream; - private final RunLengthIntegerWriter scaleStream; + private final IntegerWriter scaleStream; + private final boolean isDirectV2; DecimalTreeWriter(int columnId, ObjectInspector inspector, StreamFactory writer, boolean nullable) throws IOException { super(columnId, inspector, writer, nullable); + this.isDirectV2 = isNewWriteFormat(writer); valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA); - scaleStream = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.SECONDARY), true); + this.scaleStream = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), true, isDirectV2); recordPosition(rowIndexPosition); } @Override + OrcProto.ColumnEncoding getEncoding() { + if (isDirectV2) { + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build(); + } + + @Override void write(Object obj) throws IOException { super.write(obj); if (obj != null) { @@ -1191,25 +1279,36 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, } private static class ListTreeWriter extends TreeWriter { - private final RunLengthIntegerWriter lengths; + private final IntegerWriter lengths; + private final boolean isDirectV2; ListTreeWriter(int columnId, ObjectInspector inspector, StreamFactory writer, boolean nullable) throws IOException { super(columnId, inspector, writer, nullable); + this.isDirectV2 = isNewWriteFormat(writer); ListObjectInspector listObjectInspector = (ListObjectInspector) inspector; childrenWriters = new TreeWriter[1]; childrenWriters[0] = createTreeWriter(listObjectInspector.getListElementObjectInspector(), writer, true); - lengths = - new RunLengthIntegerWriter(writer.createStream(columnId, - OrcProto.Stream.Kind.LENGTH), false); + lengths = createIntegerWriter(writer.createStream(columnId, + OrcProto.Stream.Kind.LENGTH), false, isDirectV2); recordPosition(rowIndexPosition); } @Override + OrcProto.ColumnEncoding getEncoding() { + if (isDirectV2) { + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build(); + } + + @Override void write(Object obj) throws IOException { super.write(obj); if (obj != null) { @@ -1241,26 +1340,37 @@ void recordPosition(PositionRecorder recorder) throws IOException { } private static class MapTreeWriter extends TreeWriter { - private final RunLengthIntegerWriter lengths; + private final IntegerWriter lengths; + private final boolean isDirectV2; MapTreeWriter(int columnId, ObjectInspector inspector, StreamFactory writer, boolean nullable) throws IOException { super(columnId, inspector, writer, nullable); + this.isDirectV2 = isNewWriteFormat(writer); MapObjectInspector insp = (MapObjectInspector) inspector; childrenWriters = new TreeWriter[2]; childrenWriters[0] = createTreeWriter(insp.getMapKeyObjectInspector(), writer, true); childrenWriters[1] = createTreeWriter(insp.getMapValueObjectInspector(), writer, true); - lengths = - new RunLengthIntegerWriter(writer.createStream(columnId, - OrcProto.Stream.Kind.LENGTH), false); + lengths = createIntegerWriter(writer.createStream(columnId, + OrcProto.Stream.Kind.LENGTH), false, isDirectV2); recordPosition(rowIndexPosition); } @Override + OrcProto.ColumnEncoding getEncoding() { + if (isDirectV2) { + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build(); + } + + @Override void write(Object obj) throws IOException { super.write(obj); if (obj != null) { @@ -1346,8 +1456,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { private static TreeWriter createTreeWriter(ObjectInspector inspector, StreamFactory streamFactory, - boolean nullable - ) throws IOException { + boolean nullable) throws IOException { switch (inspector.getCategory()) { case PRIMITIVE: switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) { diff --git ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto index 417c37a..d5bea25 100644 --- ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto +++ ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto @@ -73,6 +73,8 @@ message ColumnEncoding { enum Kind { DIRECT = 0; DICTIONARY = 1; + DIRECT_V2 = 2; + DICTIONARY_V2 = 3; } required Kind kind = 1; optional uint32 dictionarySize = 2; diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java new file mode 100644 index 0000000..c0f765d --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Random; + +import org.junit.Test; + +import com.google.common.primitives.Longs; + +public class TestBitPack { + + private static final int SIZE = 100; + private static Random rand = new Random(100); + + private long[] deltaEncode(long[] inp) { + long[] output = new long[inp.length]; + for(int i = 0; i < inp.length; i++) { + output[i] = SerializationUtils.zigzagEncode(inp[i]); + } + return output; + } + + private long nextLong(Random rng, long n) { + long bits, val; + do { + bits = (rng.nextLong() << 1) >>> 1; + val = bits % n; + } while (bits - val + (n - 1) < 0L); + return val; + } + + private void runTest(int numBits) throws IOException { + long[] inp = new long[SIZE]; + for(int i = 0; i < SIZE; i++) { + long val = 0; + if (numBits <= 32) { + if (numBits == 1) { + val = -1 * rand.nextInt(2); + } else { + val = rand.nextInt((int) Math.pow(2, numBits - 1)); + } + } else { + val = nextLong(rand, (long) Math.pow(2, numBits - 2)); + } + if (val % 2 == 0) { + val = -val; + } + inp[i] = val; + } + long[] deltaEncoded = deltaEncode(inp); + long minInput = Collections.min(Longs.asList(deltaEncoded)); + long maxInput = Collections.max(Longs.asList(deltaEncoded)); + long rangeInput = maxInput - minInput; + int fixedWidth = SerializationUtils.findClosestNumBits(rangeInput); + TestInStream.OutputCollector collect = new TestInStream.OutputCollector(); + OutStream output = new OutStream("test", SIZE, null, collect); + SerializationUtils.writeInts(deltaEncoded, 0, deltaEncoded.length, + fixedWidth, output); + output.flush(); + ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); + collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); + inBuf.flip(); + long[] buff = new long[SIZE]; + SerializationUtils.readInts(buff, 0, SIZE, fixedWidth, + InStream.create("test", inBuf, null, SIZE)); + for(int i = 0; i < SIZE; i++) { + buff[i] = SerializationUtils.zigzagDecode(buff[i]); + } + assertEquals(numBits, fixedWidth); + assertArrayEquals(inp, buff); + } + + @Test + public void test01BitPacking1Bit() throws IOException { + runTest(1); + } + + @Test + public void test02BitPacking2Bit() throws IOException { + runTest(2); + } + + @Test + public void test03BitPacking3Bit() throws IOException { + runTest(3); + } + + @Test + public void test04BitPacking4Bit() throws IOException { + runTest(4); + } + + @Test + public void test05BitPacking5Bit() throws IOException { + runTest(5); + } + + @Test + public void test06BitPacking6Bit() throws IOException { + runTest(6); + } + + @Test + public void test07BitPacking7Bit() throws IOException { + runTest(7); + } + + @Test + public void test08BitPacking8Bit() throws IOException { + runTest(8); + } + + @Test + public void test09BitPacking9Bit() throws IOException { + runTest(9); + } + + @Test + public void test10BitPacking10Bit() throws IOException { + runTest(10); + } + + @Test + public void test11BitPacking11Bit() throws IOException { + runTest(11); + } + + @Test + public void test12BitPacking12Bit() throws IOException { + runTest(12); + } + + @Test + public void test13BitPacking13Bit() throws IOException { + runTest(13); + } + + @Test + public void test14BitPacking14Bit() throws IOException { + runTest(14); + } + + @Test + public void test15BitPacking15Bit() throws IOException { + runTest(15); + } + + @Test + public void test16BitPacking16Bit() throws IOException { + runTest(16); + } + + @Test + public void test17BitPacking17Bit() throws IOException { + runTest(17); + } + + @Test + public void test18BitPacking18Bit() throws IOException { + runTest(18); + } + + @Test + public void test19BitPacking19Bit() throws IOException { + runTest(19); + } + + @Test + public void test20BitPacking20Bit() throws IOException { + runTest(20); + } + + @Test + public void test21BitPacking21Bit() throws IOException { + runTest(21); + } + + @Test + public void test22BitPacking22Bit() throws IOException { + runTest(22); + } + + @Test + public void test23BitPacking23Bit() throws IOException { + runTest(23); + } + + @Test + public void test24BitPacking24Bit() throws IOException { + runTest(24); + } + + @Test + public void test26BitPacking26Bit() throws IOException { + runTest(26); + } + + @Test + public void test28BitPacking28Bit() throws IOException { + runTest(28); + } + + @Test + public void test30BitPacking30Bit() throws IOException { + runTest(30); + } + + @Test + public void test32BitPacking32Bit() throws IOException { + runTest(32); + } + + @Test + public void test40BitPacking40Bit() throws IOException { + runTest(40); + } + + @Test + public void test48BitPacking48Bit() throws IOException { + runTest(48); + } + + @Test + public void test56BitPacking56Bit() throws IOException { + runTest(56); + } + + @Test + public void test64BitPacking64Bit() throws IOException { + runTest(64); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java new file mode 100644 index 0000000..0fb0fcb --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import static junit.framework.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.util.Random; + +import org.junit.Test; + +public class TestIntegerCompressionReader { + + public void runSeekTest(CompressionCodec codec) throws Exception { + TestInStream.OutputCollector collect = new TestInStream.OutputCollector(); + RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2( + new OutStream("test", 1000, codec, collect), true); + TestInStream.PositionCollector[] positions = + new TestInStream.PositionCollector[4096]; + Random random = new Random(99); + int[] junk = new int[2048]; + for(int i=0; i < junk.length; ++i) { + junk[i] = random.nextInt(); + } + for(int i=0; i < 4096; ++i) { + positions[i] = new TestInStream.PositionCollector(); + out.getPosition(positions[i]); + // test runs, incrementing runs, non-runs + if (i < 1024) { + out.write(i/4); + } else if (i < 2048) { + out.write(2*i); + } else { + out.write(junk[i-2048]); + } + } + out.flush(); + ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); + collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); + inBuf.flip(); + RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create + ("test", inBuf, codec, 1000), true); + for(int i=0; i < 2048; ++i) { + int x = (int) in.next(); + if (i < 1024) { + assertEquals(i/4, x); + } else if (i < 2048) { + assertEquals(2*i, x); + } else { + assertEquals(junk[i-2048], x); + } + } + for(int i=2047; i >= 0; --i) { + in.seek(positions[i]); + int x = (int) in.next(); + if (i < 1024) { + assertEquals(i/4, x); + } else if (i < 2048) { + assertEquals(2*i, x); + } else { + assertEquals(junk[i-2048], x); + } + } + } + + @Test + public void testUncompressedSeek() throws Exception { + runSeekTest(null); + } + + @Test + public void testCompressedSeek() throws Exception { + runSeekTest(new ZlibCodec()); + } + + @Test + public void testSkips() throws Exception { + TestInStream.OutputCollector collect = new TestInStream.OutputCollector(); + RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2( + new OutStream("test", 100, null, collect), true); + for(int i=0; i < 2048; ++i) { + if (i < 1024) { + out.write(i); + } else { + out.write(256 * i); + } + } + out.flush(); + ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); + collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); + inBuf.flip(); + RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create + ("test", inBuf, null, 100), true); + for(int i=0; i < 2048; i += 10) { + int x = (int) in.next(); + if (i < 1024) { + assertEquals(i, x); + } else { + assertEquals(256 * i, x); + } + if (i < 2038) { + in.skip(9); + } + in.skip(0); + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java new file mode 100644 index 0000000..4d3183e --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java @@ -0,0 +1,824 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import static junit.framework.Assert.assertEquals; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; + +public class TestNewIntegerEncoding { + + public static class Row { + Integer int1; + Long long1; + + public Row(int val, long l) { + this.int1 = val; + this.long1 = l; + } + } + + public List fetchData(String path) throws IOException { + List input = new ArrayList(); + FileInputStream stream = new FileInputStream(new File(path)); + try { + FileChannel fc = stream.getChannel(); + MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size()); + /* Instead of using default, pass in a decoder. */ + String[] lines = Charset.defaultCharset().decode(bb).toString() + .split("\n"); + for(String line : lines) { + long val = 0; + try { + val = Long.parseLong(line); + } catch (NumberFormatException e) { + // for now lets ignore (assign 0) + } + input.add(val); + } + } finally { + stream.close(); + } + return input; + } + + Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + + File.separator + "test" + File.separator + "tmp")); + + Configuration conf; + FileSystem fs; + Path testFilePath; + String resDir = "ql/src/test/resources"; + + @Rule + public TestName testCaseName = new TestName(); + + @Before + public void openFileSystem() throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + testFilePath = new Path(workDir, "TestOrcFile." + + testCaseName.getMethodName() + ".orc"); + fs.delete(testFilePath, false); + } + + @Test + public void testBasicRow() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Row.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + writer.addRow(new Row(111, 1111L)); + writer.addRow(new Row(111, 1111L)); + writer.addRow(new Row(111, 1111L)); + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(new IntWritable(111), ((OrcStruct) row).getFieldValue(0)); + assertEquals(new LongWritable(1111), ((OrcStruct) row).getFieldValue(1)); + } + } + + @Test + public void testBasicOld() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5, 6, + 7, 8, 9, 10, 1, 1, 1, 1, 1, 1, 10, 9, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, + 2, 5, 1, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, + 9, 2, 6, 3, 7, 1, 9, 2, 6, 2000, 2, 1, 1, 1, 1, 1, 3, 7, 1, 9, 2, 6, 1, + 1, 1, 1, 1 }; + List input = Lists.newArrayList(Longs.asList(inp)); + conf.set("hive.exec.orc.write.format", "0.11"); + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testBasicNew() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5, 6, + 7, 8, 9, 10, 1, 1, 1, 1, 1, 1, 10, 9, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, + 2, 5, 1, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, + 9, 2, 6, 3, 7, 1, 9, 2, 6, 2000, 2, 1, 1, 1, 1, 1, 3, 7, 1, 9, 2, 6, 1, + 1, 1, 1, 1 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testBasicDelta1() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { -500, -400, -350, -325, -310 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testBasicDelta2() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { -500, -600, -650, -675, -710 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testBasicDelta3() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 500, 400, 350, 325, 310 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testBasicDelta4() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 500, 600, 650, 675, 710 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testIntegerMin() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add((long) Integer.MIN_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.ZLIB, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testIntegerMax() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add((long) Integer.MAX_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testLongMin() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add(Long.MIN_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testLongMax() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add(Long.MAX_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testRandomInt() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for(int i = 0; i < 100000; i++) { + input.add((long) rand.nextInt()); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testRandomLong() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for(int i = 0; i < 100000; i++) { + input.add(rand.nextLong()); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseNegativeMin() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2, + 3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, 1, 2, 3, 1, 8, 30, 1, 3, 414, 1, + 1, 135, 3, 3, 1, 414, 2, 1, 2, 2, 594, 2, 5, 6, 4, 11, 1, 2, 2, 1, 1, + 52, 4, 1, 2, 7, 1, 17, 334, 1, 2, 1, 2, 2, 6, 1, 266, 1, 2, 217, 2, 6, + 2, 13, 2, 2, 1, 2, 3, 5, 1, 2, 1, 7244, 11813, 1, 33, 2, -13, 1, 2, 3, + 13, 1, 92, 3, 13, 5, 14, 9, 141, 12, 6, 15, 25, 1, 1, 1, 46, 2, 1, 1, + 141, 3, 1, 1, 1, 1, 2, 1, 4, 34, 5, 78, 8, 1, 2, 2, 1, 9, 10, 2, 1, 4, + 13, 1, 5, 4, 4, 19, 5, 1, 1, 1, 68, 33, 399, 1, 1885, 25, 5, 2, 4, 1, + 1, 2, 16, 1, 2966, 3, 1, 1, 25501, 1, 1, 1, 66, 1, 3, 8, 131, 14, 5, 1, + 2, 2, 1, 1, 8, 1, 1, 2, 1, 5, 9, 2, 3, 112, 13, 2, 2, 1, 5, 10, 3, 1, + 1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2, + 2, 16 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseNegativeMin2() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2, + 3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, 1, 2, 3, 1, 8, 30, 1, 3, 414, 1, + 1, 135, 3, 3, 1, 414, 2, 1, 2, 2, 594, 2, 5, 6, 4, 11, 1, 2, 2, 1, 1, + 52, 4, 1, 2, 7, 1, 17, 334, 1, 2, 1, 2, 2, 6, 1, 266, 1, 2, 217, 2, 6, + 2, 13, 2, 2, 1, 2, 3, 5, 1, 2, 1, 7244, 11813, 1, 33, 2, -1, 1, 2, 3, + 13, 1, 92, 3, 13, 5, 14, 9, 141, 12, 6, 15, 25, 1, 1, 1, 46, 2, 1, 1, + 141, 3, 1, 1, 1, 1, 2, 1, 4, 34, 5, 78, 8, 1, 2, 2, 1, 9, 10, 2, 1, 4, + 13, 1, 5, 4, 4, 19, 5, 1, 1, 1, 68, 33, 399, 1, 1885, 25, 5, 2, 4, 1, + 1, 2, 16, 1, 2966, 3, 1, 1, 25501, 1, 1, 1, 66, 1, 3, 8, 131, 14, 5, 1, + 2, 2, 1, 1, 8, 1, 1, 2, 1, 5, 9, 2, 3, 112, 13, 2, 2, 1, 5, 10, 3, 1, + 1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2, + 2, 16 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseNegativeMin3() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2, + 3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, 1, 2, 3, 1, 8, 30, 1, 3, 414, 1, + 1, 135, 3, 3, 1, 414, 2, 1, 2, 2, 594, 2, 5, 6, 4, 11, 1, 2, 2, 1, 1, + 52, 4, 1, 2, 7, 1, 17, 334, 1, 2, 1, 2, 2, 6, 1, 266, 1, 2, 217, 2, 6, + 2, 13, 2, 2, 1, 2, 3, 5, 1, 2, 1, 7244, 11813, 1, 33, 2, 0, 1, 2, 3, + 13, 1, 92, 3, 13, 5, 14, 9, 141, 12, 6, 15, 25, 1, 1, 1, 46, 2, 1, 1, + 141, 3, 1, 1, 1, 1, 2, 1, 4, 34, 5, 78, 8, 1, 2, 2, 1, 9, 10, 2, 1, 4, + 13, 1, 5, 4, 4, 19, 5, 1, 1, 1, 68, 33, 399, 1, 1885, 25, 5, 2, 4, 1, + 1, 2, 16, 1, 2966, 3, 1, 1, 25501, 1, 1, 1, 66, 1, 3, 8, 131, 14, 5, 1, + 2, 2, 1, 1, 8, 1, 1, 2, 1, 5, 9, 2, 3, 112, 13, 2, 2, 1, 5, 10, 3, 1, + 1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2, + 2, 16 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseNegativeMin4() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 13, 13, 11, 8, 13, 10, 10, 11, 11, 14, 11, 7, 13, + 12, 12, 11, 15, 12, 12, 9, 8, 10, 13, 11, 8, 6, 5, 6, 11, 7, 15, 10, 7, + 6, 8, 7, 9, 9, 11, 33, 11, 3, 7, 4, 6, 10, 14, 12, 5, 14, 7, 6 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt0() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for(int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(0, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt1() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for(int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(1, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt255() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for(int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(255, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.ZLIB, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt256() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for(int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(256, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.ZLIB, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBase510() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for(int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(510, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.ZLIB, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBase511() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for(int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(511, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.ZLIB, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testSeek() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for(int i = 0; i < 100000; i++) { + input.add((long) rand.nextInt()); + } + conf.set("hive.exec.orc.write.format", "0.11"); + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, + 100000, CompressionKind.NONE, 10000, 10000); + for(Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 55555; + rows.seekToRow(idx); + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java index 9f989fd..2f7a7f1 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.ql.io.orc; import static junit.framework.Assert.assertEquals; diff --git ql/src/test/resources/orc-file-dump-dictionary-threshold.out ql/src/test/resources/orc-file-dump-dictionary-threshold.out index c486f4d..003c132 100644 --- ql/src/test/resources/orc-file-dump-dictionary-threshold.out +++ ql/src/test/resources/orc-file-dump-dictionary-threshold.out @@ -11,68 +11,68 @@ Statistics: Column 3: count: 21000 min: Darkness,-230 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878-18010-18410-18524-18788-19204-19254-19518-19596-19786-19874-19904-20390-20752-20936 Stripes: - Stripe: offset: 3 data: 107035 rows: 4000 tail: 65 index: 217 + Stripe: offset: 3 data: 102311 rows: 4000 tail: 68 index: 217 Stream: column 0 section ROW_INDEX start: 3 length 10 Stream: column 1 section ROW_INDEX start: 13 length 36 Stream: column 2 section ROW_INDEX start: 49 length 39 Stream: column 3 section ROW_INDEX start: 88 length 132 - Stream: column 1 section DATA start: 220 length 18043 - Stream: column 2 section DATA start: 18263 length 34740 - Stream: column 3 section DATA start: 53003 length 50887 - Stream: column 3 section LENGTH start: 103890 length 3365 + Stream: column 1 section DATA start: 220 length 16022 + Stream: column 2 section DATA start: 16242 length 32028 + Stream: column 3 section DATA start: 48270 length 50887 + Stream: column 3 section LENGTH start: 99157 length 3374 Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT - Encoding column 3: DIRECT - Stripe: offset: 107320 data: 289727 rows: 5000 tail: 65 index: 349 - Stream: column 0 section ROW_INDEX start: 107320 length 10 - Stream: column 1 section ROW_INDEX start: 107330 length 36 - Stream: column 2 section ROW_INDEX start: 107366 length 39 - Stream: column 3 section ROW_INDEX start: 107405 length 264 - Stream: column 1 section DATA start: 107669 length 22581 - Stream: column 2 section DATA start: 130250 length 43426 - Stream: column 3 section DATA start: 173676 length 219588 - Stream: column 3 section LENGTH start: 393264 length 4132 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 + Encoding column 3: DIRECT_V2 + Stripe: offset: 102599 data: 284999 rows: 5000 tail: 68 index: 349 + Stream: column 0 section ROW_INDEX start: 102599 length 10 + Stream: column 1 section ROW_INDEX start: 102609 length 36 + Stream: column 2 section ROW_INDEX start: 102645 length 39 + Stream: column 3 section ROW_INDEX start: 102684 length 264 + Stream: column 1 section DATA start: 102948 length 20029 + Stream: column 2 section DATA start: 122977 length 40035 + Stream: column 3 section DATA start: 163012 length 219588 + Stream: column 3 section LENGTH start: 382600 length 5347 Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT - Encoding column 3: DIRECT - Stripe: offset: 397461 data: 496162 rows: 5000 tail: 66 index: 536 - Stream: column 0 section ROW_INDEX start: 397461 length 10 - Stream: column 1 section ROW_INDEX start: 397471 length 36 - Stream: column 2 section ROW_INDEX start: 397507 length 39 - Stream: column 3 section ROW_INDEX start: 397546 length 451 - Stream: column 1 section DATA start: 397997 length 22605 - Stream: column 2 section DATA start: 420602 length 43444 - Stream: column 3 section DATA start: 464046 length 425862 - Stream: column 3 section LENGTH start: 889908 length 4251 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 + Encoding column 3: DIRECT_V2 + Stripe: offset: 388015 data: 491655 rows: 5000 tail: 69 index: 536 + Stream: column 0 section ROW_INDEX start: 388015 length 10 + Stream: column 1 section ROW_INDEX start: 388025 length 36 + Stream: column 2 section ROW_INDEX start: 388061 length 39 + Stream: column 3 section ROW_INDEX start: 388100 length 451 + Stream: column 1 section DATA start: 388551 length 20029 + Stream: column 2 section DATA start: 408580 length 40035 + Stream: column 3 section DATA start: 448615 length 425862 + Stream: column 3 section LENGTH start: 874477 length 5729 Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT - Encoding column 3: DIRECT - Stripe: offset: 894225 data: 711982 rows: 5000 tail: 65 index: 677 - Stream: column 0 section ROW_INDEX start: 894225 length 10 - Stream: column 1 section ROW_INDEX start: 894235 length 36 - Stream: column 2 section ROW_INDEX start: 894271 length 39 - Stream: column 3 section ROW_INDEX start: 894310 length 592 - Stream: column 1 section DATA start: 894902 length 22591 - Stream: column 2 section DATA start: 917493 length 43414 - Stream: column 3 section DATA start: 960907 length 641580 - Stream: column 3 section LENGTH start: 1602487 length 4397 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 + Encoding column 3: DIRECT_V2 + Stripe: offset: 880275 data: 707368 rows: 5000 tail: 68 index: 677 + Stream: column 0 section ROW_INDEX start: 880275 length 10 + Stream: column 1 section ROW_INDEX start: 880285 length 36 + Stream: column 2 section ROW_INDEX start: 880321 length 39 + Stream: column 3 section ROW_INDEX start: 880360 length 592 + Stream: column 1 section DATA start: 880952 length 20029 + Stream: column 2 section DATA start: 900981 length 40035 + Stream: column 3 section DATA start: 941016 length 641580 + Stream: column 3 section LENGTH start: 1582596 length 5724 Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT - Encoding column 3: DIRECT - Stripe: offset: 1606949 data: 350645 rows: 2000 tail: 66 index: 786 - Stream: column 0 section ROW_INDEX start: 1606949 length 10 - Stream: column 1 section ROW_INDEX start: 1606959 length 36 - Stream: column 2 section ROW_INDEX start: 1606995 length 39 - Stream: column 3 section ROW_INDEX start: 1607034 length 701 - Stream: column 1 section DATA start: 1607735 length 9027 - Stream: column 2 section DATA start: 1616762 length 17375 - Stream: column 3 section DATA start: 1634137 length 322259 - Stream: column 3 section LENGTH start: 1956396 length 1984 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 + Encoding column 3: DIRECT_V2 + Stripe: offset: 1588388 data: 348697 rows: 2000 tail: 67 index: 786 + Stream: column 0 section ROW_INDEX start: 1588388 length 10 + Stream: column 1 section ROW_INDEX start: 1588398 length 36 + Stream: column 2 section ROW_INDEX start: 1588434 length 39 + Stream: column 3 section ROW_INDEX start: 1588473 length 701 + Stream: column 1 section DATA start: 1589174 length 8011 + Stream: column 2 section DATA start: 1597185 length 16014 + Stream: column 3 section DATA start: 1613199 length 322259 + Stream: column 3 section LENGTH start: 1935458 length 2413 Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT - Encoding column 3: DIRECT + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 + Encoding column 3: DIRECT_V2 \ No newline at end of file diff --git ql/src/test/resources/orc-file-dump.out ql/src/test/resources/orc-file-dump.out index 8b88931..b250797 100644 --- ql/src/test/resources/orc-file-dump.out +++ ql/src/test/resources/orc-file-dump.out @@ -11,73 +11,73 @@ Statistics: Column 3: count: 21000 min: Darkness, max: worst Stripes: - Stripe: offset: 3 data: 69605 rows: 5000 tail: 72 index: 119 + Stripe: offset: 3 data: 63766 rows: 5000 tail: 74 index: 119 Stream: column 0 section ROW_INDEX start: 3 length 10 Stream: column 1 section ROW_INDEX start: 13 length 35 Stream: column 2 section ROW_INDEX start: 48 length 39 Stream: column 3 section ROW_INDEX start: 87 length 35 - Stream: column 1 section DATA start: 122 length 22605 - Stream: column 2 section DATA start: 22727 length 43426 - Stream: column 3 section DATA start: 66153 length 3403 - Stream: column 3 section LENGTH start: 69556 length 38 - Stream: column 3 section DICTIONARY_DATA start: 69594 length 133 + Stream: column 1 section DATA start: 122 length 20029 + Stream: column 2 section DATA start: 20151 length 40035 + Stream: column 3 section DATA start: 60186 length 3544 + Stream: column 3 section LENGTH start: 63730 length 25 + Stream: column 3 section DICTIONARY_DATA start: 63755 length 133 Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT - Encoding column 3: DICTIONARY[35] - Stripe: offset: 69799 data: 69584 rows: 5000 tail: 73 index: 118 - Stream: column 0 section ROW_INDEX start: 69799 length 10 - Stream: column 1 section ROW_INDEX start: 69809 length 34 - Stream: column 2 section ROW_INDEX start: 69843 length 39 - Stream: column 3 section ROW_INDEX start: 69882 length 35 - Stream: column 1 section DATA start: 69917 length 22597 - Stream: column 2 section DATA start: 92514 length 43439 - Stream: column 3 section DATA start: 135953 length 3377 - Stream: column 3 section LENGTH start: 139330 length 38 - Stream: column 3 section DICTIONARY_DATA start: 139368 length 133 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 + Encoding column 3: DICTIONARY_V2 + Stripe: offset: 63962 data: 63755 rows: 5000 tail: 76 index: 118 + Stream: column 0 section ROW_INDEX start: 63962 length 10 + Stream: column 1 section ROW_INDEX start: 63972 length 34 + Stream: column 2 section ROW_INDEX start: 64006 length 39 + Stream: column 3 section ROW_INDEX start: 64045 length 35 + Stream: column 1 section DATA start: 64080 length 20029 + Stream: column 2 section DATA start: 84109 length 40035 + Stream: column 3 section DATA start: 124144 length 3533 + Stream: column 3 section LENGTH start: 127677 length 25 + Stream: column 3 section DICTIONARY_DATA start: 127702 length 133 Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT - Encoding column 3: DICTIONARY[35] - Stripe: offset: 139574 data: 69570 rows: 5000 tail: 73 index: 120 - Stream: column 0 section ROW_INDEX start: 139574 length 10 - Stream: column 1 section ROW_INDEX start: 139584 length 36 - Stream: column 2 section ROW_INDEX start: 139620 length 39 - Stream: column 3 section ROW_INDEX start: 139659 length 35 - Stream: column 1 section DATA start: 139694 length 22594 - Stream: column 2 section DATA start: 162288 length 43415 - Stream: column 3 section DATA start: 205703 length 3390 - Stream: column 3 section LENGTH start: 209093 length 38 - Stream: column 3 section DICTIONARY_DATA start: 209131 length 133 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 + Encoding column 3: DICTIONARY_V2 + Stripe: offset: 127911 data: 63766 rows: 5000 tail: 76 index: 120 + Stream: column 0 section ROW_INDEX start: 127911 length 10 + Stream: column 1 section ROW_INDEX start: 127921 length 36 + Stream: column 2 section ROW_INDEX start: 127957 length 39 + Stream: column 3 section ROW_INDEX start: 127996 length 35 + Stream: column 1 section DATA start: 128031 length 20029 + Stream: column 2 section DATA start: 148060 length 40035 + Stream: column 3 section DATA start: 188095 length 3544 + Stream: column 3 section LENGTH start: 191639 length 25 + Stream: column 3 section DICTIONARY_DATA start: 191664 length 133 Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT - Encoding column 3: DICTIONARY[35] - Stripe: offset: 209337 data: 69551 rows: 5000 tail: 72 index: 119 - Stream: column 0 section ROW_INDEX start: 209337 length 10 - Stream: column 1 section ROW_INDEX start: 209347 length 35 - Stream: column 2 section ROW_INDEX start: 209382 length 39 - Stream: column 3 section ROW_INDEX start: 209421 length 35 - Stream: column 1 section DATA start: 209456 length 22575 - Stream: column 2 section DATA start: 232031 length 43426 - Stream: column 3 section DATA start: 275457 length 3379 - Stream: column 3 section LENGTH start: 278836 length 38 - Stream: column 3 section DICTIONARY_DATA start: 278874 length 133 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 + Encoding column 3: DICTIONARY_V2 + Stripe: offset: 191873 data: 63796 rows: 5000 tail: 74 index: 119 + Stream: column 0 section ROW_INDEX start: 191873 length 10 + Stream: column 1 section ROW_INDEX start: 191883 length 35 + Stream: column 2 section ROW_INDEX start: 191918 length 39 + Stream: column 3 section ROW_INDEX start: 191957 length 35 + Stream: column 1 section DATA start: 191992 length 20029 + Stream: column 2 section DATA start: 212021 length 40035 + Stream: column 3 section DATA start: 252056 length 3574 + Stream: column 3 section LENGTH start: 255630 length 25 + Stream: column 3 section DICTIONARY_DATA start: 255655 length 133 Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT - Encoding column 3: DICTIONARY[35] - Stripe: offset: 279079 data: 14096 rows: 1000 tail: 68 index: 120 - Stream: column 0 section ROW_INDEX start: 279079 length 10 - Stream: column 1 section ROW_INDEX start: 279089 length 36 - Stream: column 2 section ROW_INDEX start: 279125 length 39 - Stream: column 3 section ROW_INDEX start: 279164 length 35 - Stream: column 1 section DATA start: 279199 length 4529 - Stream: column 2 section DATA start: 283728 length 8690 - Stream: column 3 section DATA start: 292418 length 706 - Stream: column 3 section LENGTH start: 293124 length 38 - Stream: column 3 section DICTIONARY_DATA start: 293162 length 133 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 + Encoding column 3: DICTIONARY_V2 + Stripe: offset: 255862 data: 12940 rows: 1000 tail: 71 index: 120 + Stream: column 0 section ROW_INDEX start: 255862 length 10 + Stream: column 1 section ROW_INDEX start: 255872 length 36 + Stream: column 2 section ROW_INDEX start: 255908 length 39 + Stream: column 3 section ROW_INDEX start: 255947 length 35 + Stream: column 1 section DATA start: 255982 length 4007 + Stream: column 2 section DATA start: 259989 length 8007 + Stream: column 3 section DATA start: 267996 length 768 + Stream: column 3 section LENGTH start: 268764 length 25 + Stream: column 3 section DICTIONARY_DATA start: 268789 length 133 Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT - Encoding column 3: DICTIONARY[35] + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 + Encoding column 3: DICTIONARY_V2 \ No newline at end of file