diff --git ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java index f6acc00..cce1415 100644 --- ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java +++ ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java @@ -5157,10 +5157,12 @@ public ColumnEncoding getDefaultInstanceForType() { implements com.google.protobuf.ProtocolMessageEnum { DIRECT(0, 0), DICTIONARY(1, 1), + DIRECT_V2(2, 2), ; public static final int DIRECT_VALUE = 0; public static final int DICTIONARY_VALUE = 1; + public static final int DIRECT_V2_VALUE = 2; public final int getNumber() { return value; } @@ -5169,6 +5171,7 @@ public static Kind valueOf(int value) { switch (value) { case 0: return DIRECT; case 1: return DICTIONARY; + case 2: return DIRECT_V2; default: return null; } } @@ -5199,7 +5202,7 @@ public Kind findValueByNumber(int number) { } private static final Kind[] VALUES = { - DIRECT, DICTIONARY, + DIRECT, DICTIONARY, DIRECT_V2, }; public static Kind valueOf( @@ -10568,41 +10571,42 @@ void setMagic(com.google.protobuf.ByteString value) { "nd\022\016\n\006column\030\002 \001(\r\022\016\n\006length\030\003 \001(\004\"r\n\004Ki" + "nd\022\013\n\007PRESENT\020\000\022\010\n\004DATA\020\001\022\n\n\006LENGTH\020\002\022\023\n" + "\017DICTIONARY_DATA\020\003\022\024\n\020DICTIONARY_COUNT\020\004" + - "\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\221\001\n\016Colum", + "\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\240\001\n\016Colum", "nEncoding\022C\n\004kind\030\001 \002(\01625.org.apache.had" + "oop.hive.ql.io.orc.ColumnEncoding.Kind\022\026" + - "\n\016dictionarySize\030\002 \001(\r\"\"\n\004Kind\022\n\n\006DIRECT" + - "\020\000\022\016\n\nDICTIONARY\020\001\"\214\001\n\014StripeFooter\0229\n\007s" + - "treams\030\001 \003(\0132(.org.apache.hadoop.hive.ql" + - ".io.orc.Stream\022A\n\007columns\030\002 \003(\01320.org.ap" + - "ache.hadoop.hive.ql.io.orc.ColumnEncodin" + - "g\"\236\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apache.ha" + - "doop.hive.ql.io.orc.Type.Kind\022\024\n\010subtype" + - "s\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\260\001\n\004Kind", - "\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002\022\007\n\003IN" + - "T\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE\020\006\022\n\n\006" + - "STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020\t\022\010\n\004L" + - "IST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNION\020\r\022\013\n" + - "\007DECIMAL\020\016\"x\n\021StripeInformation\022\016\n\006offse" + - "t\030\001 \001(\004\022\023\n\013indexLength\030\002 \001(\004\022\022\n\ndataLeng" + - "th\030\003 \001(\004\022\024\n\014footerLength\030\004 \001(\004\022\024\n\014number" + - "OfRows\030\005 \001(\004\"/\n\020UserMetadataItem\022\014\n\004name" + - "\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n\006Footer\022\024\n\014head" + - "erLength\030\001 \001(\004\022\025\n\rcontentLength\030\002 \001(\004\022D\n", - "\007stripes\030\003 \003(\01323.org.apache.hadoop.hive." + - "ql.io.orc.StripeInformation\0225\n\005types\030\004 \003" + - "(\0132&.org.apache.hadoop.hive.ql.io.orc.Ty" + - "pe\022D\n\010metadata\030\005 \003(\01322.org.apache.hadoop" + - ".hive.ql.io.orc.UserMetadataItem\022\024\n\014numb" + - "erOfRows\030\006 \001(\004\022F\n\nstatistics\030\007 \003(\01322.org" + - ".apache.hadoop.hive.ql.io.orc.ColumnStat" + - "istics\022\026\n\016rowIndexStride\030\010 \001(\r\"\255\001\n\nPostS" + - "cript\022\024\n\014footerLength\030\001 \001(\004\022F\n\013compressi" + - "on\030\002 \001(\01621.org.apache.hadoop.hive.ql.io.", - "orc.CompressionKind\022\034\n\024compressionBlockS" + - "ize\030\003 \001(\004\022\023\n\007version\030\004 \003(\rB\002\020\001\022\016\n\005magic\030" + - "\300> \001(\t*:\n\017CompressionKind\022\010\n\004NONE\020\000\022\010\n\004Z" + - "LIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003" + "\n\016dictionarySize\030\002 \001(\r\"1\n\004Kind\022\n\n\006DIRECT" + + "\020\000\022\016\n\nDICTIONARY\020\001\022\r\n\tDIRECT_V2\020\002\"\214\001\n\014St" + + "ripeFooter\0229\n\007streams\030\001 \003(\0132(.org.apache" + + ".hadoop.hive.ql.io.orc.Stream\022A\n\007columns" + + "\030\002 \003(\01320.org.apache.hadoop.hive.ql.io.or" + + "c.ColumnEncoding\"\236\002\n\004Type\0229\n\004kind\030\001 \002(\0162" + + "+.org.apache.hadoop.hive.ql.io.orc.Type." + + "Kind\022\024\n\010subtypes\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames", + "\030\003 \003(\t\"\260\001\n\004Kind\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t" + + "\n\005SHORT\020\002\022\007\n\003INT\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022" + + "\n\n\006DOUBLE\020\006\022\n\n\006STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tT" + + "IMESTAMP\020\t\022\010\n\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020" + + "\014\022\t\n\005UNION\020\r\022\013\n\007DECIMAL\020\016\"x\n\021StripeInfor" + + "mation\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002 " + + "\001(\004\022\022\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength\030" + + "\004 \001(\004\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMetad" + + "ataItem\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n" + + "\006Footer\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rcontent", + "Length\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apac" + + "he.hadoop.hive.ql.io.orc.StripeInformati" + + "on\0225\n\005types\030\004 \003(\0132&.org.apache.hadoop.hi" + + "ve.ql.io.orc.Type\022D\n\010metadata\030\005 \003(\01322.or" + + "g.apache.hadoop.hive.ql.io.orc.UserMetad" + + "ataItem\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatist" + + "ics\030\007 \003(\01322.org.apache.hadoop.hive.ql.io" + + ".orc.ColumnStatistics\022\026\n\016rowIndexStride\030" + + "\010 \001(\r\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 \001" + + "(\004\022F\n\013compression\030\002 \001(\01621.org.apache.had", + "oop.hive.ql.io.orc.CompressionKind\022\034\n\024co" + + "mpressionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003(" + + "\rB\002\020\001\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKind" + + "\022\010\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020" + + "\003" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java new file mode 100644 index 0000000..14d031c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; + +/** + * Interface for reading integers. + */ +interface IntegerReader { + + /** + * Seek to the position provided by index. + * + * @param index + * @throws IOException + */ + void seek(PositionProvider index) throws IOException; + + /** + * Skip number of specified rows. + * + * @param numValues + * @throws IOException + */ + void skip(long numValues) throws IOException; + + /** + * Check if there are any more values left. + * + * @return + * @throws IOException + */ + boolean hasNext() throws IOException; + + /** + * Return the next available value. + * + * @return + * @throws IOException + */ + long next() throws IOException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java new file mode 100644 index 0000000..ab69d49 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; + +/** + * Interface for writing integers. + */ +interface IntegerWriter { + + /** + * Get position from the stream. + * + * @param recorder + * @throws IOException + */ + void getPosition(PositionRecorder recorder) throws IOException; + + /** + * Write the integer value + * + * @param value + * @throws IOException + */ + void write(long value) throws IOException; + + /** + * Flush the buffer + * + * @throws IOException + */ + void flush() throws IOException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 06103e3..a85f9cb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -114,14 +114,18 @@ public long getNext() { protected final int columnId; private BitFieldReader present = null; protected boolean valuePresent = false; + protected boolean isDirectV2; TreeReader(Path path, int columnId) { this.path = path; this.columnId = columnId; + // FIXME: make it user configurable? + this.isDirectV2 = true; } void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { - if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { throw new IOException("Unknown encoding " + encoding + " in column " + columnId + " of " + path); } @@ -130,6 +134,9 @@ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { void startStripe(Map streams, List encoding ) throws IOException { + if(encoding.get(columnId).getKind() == OrcProto.ColumnEncoding.Kind.DIRECT) { + this.isDirectV2 = false; + } checkEncoding(encoding.get(columnId)); InStream in = streams.get(new StreamName(columnId, OrcProto.Stream.Kind.PRESENT)); @@ -263,7 +270,7 @@ void skipRows(long items) throws IOException { } private static class ShortTreeReader extends TreeReader{ - private RunLengthIntegerReader reader = null; + private IntegerReader reader = null; ShortTreeReader(Path path, int columnId) { super(path, columnId); @@ -276,7 +283,11 @@ void startStripe(Map streams, super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + if (super.isDirectV2) { + reader = new RunLengthIntegerReaderV2(streams.get(name), true); + } else { + reader = new RunLengthIntegerReader(streams.get(name), true); + } } @Override @@ -307,7 +318,7 @@ void skipRows(long items) throws IOException { } private static class IntTreeReader extends TreeReader{ - private RunLengthIntegerReader reader = null; + private IntegerReader reader = null; IntTreeReader(Path path, int columnId) { super(path, columnId); @@ -320,7 +331,11 @@ void startStripe(Map streams, super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + if (isDirectV2) { + reader = new RunLengthIntegerReaderV2(streams.get(name), true); + } else { + reader = new RunLengthIntegerReader(streams.get(name), true); + } } @Override @@ -351,7 +366,7 @@ void skipRows(long items) throws IOException { } private static class LongTreeReader extends TreeReader{ - private RunLengthIntegerReader reader = null; + private IntegerReader reader = null; LongTreeReader(Path path, int columnId) { super(path, columnId); @@ -364,7 +379,11 @@ void startStripe(Map streams, super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + if (isDirectV2) { + reader = new RunLengthIntegerReaderV2(streams.get(name), true); + } else { + reader = new RunLengthIntegerReader(streams.get(name), true); + } } @Override @@ -489,7 +508,7 @@ void skipRows(long items) throws IOException { private static class BinaryTreeReader extends TreeReader{ private InStream stream; - private RunLengthIntegerReader lengths; + private IntegerReader lengths = null; BinaryTreeReader(Path path, int columnId) { super(path, columnId); @@ -503,9 +522,15 @@ void startStripe(Map streams, StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); stream = streams.get(name); - lengths = new RunLengthIntegerReader(streams.get(new - StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), - false); + if (isDirectV2) { + lengths = new RunLengthIntegerReaderV2(streams.get(new + StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), + false); + } else { + lengths = new RunLengthIntegerReader(streams.get(new + StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), + false); + } } @Override @@ -544,7 +569,7 @@ Object next(Object previous) throws IOException { void skipRows(long items) throws IOException { items = countNonNulls(items); long lengthToSkip = 0; - for(int i=0; i < items; ++i) { + for (int i = 0; i < items; ++i) { lengthToSkip += lengths.next(); } stream.skip(lengthToSkip); @@ -552,8 +577,8 @@ void skipRows(long items) throws IOException { } private static class TimestampTreeReader extends TreeReader{ - private RunLengthIntegerReader data; - private RunLengthIntegerReader nanos; + private IntegerReader data = null; + private IntegerReader nanos = null; TimestampTreeReader(Path path, int columnId) { super(path, columnId); @@ -564,10 +589,17 @@ void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - data = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.DATA)), true); - nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.SECONDARY)), false); + if (isDirectV2) { + data = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.DATA)), true); + nanos = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.SECONDARY)), false); + } else { + data = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.DATA)), true); + nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.SECONDARY)), false); + } } @Override @@ -587,9 +619,12 @@ Object next(Object previous) throws IOException { } else { result = (Timestamp) previous; } - long millis = (data.next() + WriterImpl.BASE_TIMESTAMP) * + long millis = 0; + int newNanos = 0; + millis = (data.next() + WriterImpl.BASE_TIMESTAMP) * WriterImpl.MILLIS_PER_SECOND; - int newNanos = parseNanos(nanos.next()); + newNanos = parseNanos(nanos.next()); + // fix the rounding when we divided by 1000. if (millis >= 0) { millis += newNanos / 1000000; @@ -623,7 +658,7 @@ void skipRows(long items) throws IOException { private static class DecimalTreeReader extends TreeReader{ private InStream valueStream; - private RunLengthIntegerReader scaleStream; + private IntegerReader scaleStream = null; DecimalTreeReader(Path path, int columnId) { super(path, columnId); @@ -636,8 +671,13 @@ void startStripe(Map streams, super.startStripe(streams, encodings); valueStream = streams.get(new StreamName(columnId, OrcProto.Stream.Kind.DATA)); - scaleStream = new RunLengthIntegerReader(streams.get( - new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true); + if (isDirectV2) { + scaleStream = new RunLengthIntegerReaderV2(streams.get( + new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true); + } else { + scaleStream = new RunLengthIntegerReader(streams.get( + new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true); + } } @Override @@ -671,7 +711,7 @@ void skipRows(long items) throws IOException { private DynamicByteArray dictionaryBuffer = null; private int dictionarySize; private int[] dictionaryOffsets; - private RunLengthIntegerReader reader; + private IntegerReader reader = null; StringTreeReader(Path path, int columnId) { super(path, columnId); @@ -706,13 +746,18 @@ void startStripe(Map streams, // read the lengths name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH); in = streams.get(name); - RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false); + IntegerReader lenReader = null; + if(isDirectV2) { + lenReader = new RunLengthIntegerReaderV2(in, false); + } else { + lenReader = new RunLengthIntegerReader(in, false); + } int offset = 0; if (dictionaryOffsets == null || dictionaryOffsets.length < dictionarySize + 1) { dictionaryOffsets = new int[dictionarySize + 1]; } - for(int i=0; i < dictionarySize; ++i) { + for (int i = 0; i < dictionarySize; ++i) { dictionaryOffsets[i] = offset; offset += (int) lenReader.next(); } @@ -721,7 +766,11 @@ void startStripe(Map streams, // set up the row reader name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), false); + if (isDirectV2) { + reader = new RunLengthIntegerReaderV2(streams.get(name), false); + } else { + reader = new RunLengthIntegerReader(streams.get(name), false); + } } @Override @@ -735,7 +784,8 @@ Object next(Object previous) throws IOException { super.next(previous); Text result = null; if (valuePresent) { - int entry = (int) reader.next(); + int entry = 0; + entry = (int) reader.next(); if (previous == null) { result = new Text(); } else { @@ -919,7 +969,7 @@ void skipRows(long items) throws IOException { private static class ListTreeReader extends TreeReader { private final TreeReader elementReader; - private RunLengthIntegerReader lengths; + private IntegerReader lengths = null; ListTreeReader(Path path, int columnId, List types, @@ -972,8 +1022,13 @@ void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.LENGTH)), false); + if (isDirectV2) { + lengths = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); + } else { + lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); + } if (elementReader != null) { elementReader.startStripe(streams, encodings); } @@ -983,7 +1038,7 @@ void startStripe(Map streams, void skipRows(long items) throws IOException { items = countNonNulls(items); long childSkip = 0; - for(long i=0; i < items; ++i) { + for (long i = 0; i < items; ++i) { childSkip += lengths.next(); } elementReader.skipRows(childSkip); @@ -993,7 +1048,7 @@ void skipRows(long items) throws IOException { private static class MapTreeReader extends TreeReader { private final TreeReader keyReader; private final TreeReader valueReader; - private RunLengthIntegerReader lengths; + private IntegerReader lengths = null; MapTreeReader(Path path, int columnId, @@ -1050,8 +1105,13 @@ void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.LENGTH)), false); + if (isDirectV2) { + lengths = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); + } else { + lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); + } if (keyReader != null) { keyReader.startStripe(streams, encodings); } @@ -1064,7 +1124,7 @@ void startStripe(Map streams, void skipRows(long items) throws IOException { items = countNonNulls(items); long childSkip = 0; - for(long i=0; i < items; ++i) { + for (long i = 0; i < items; ++i) { childSkip += lengths.next(); } keyReader.skipRows(childSkip); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java index 2825c64..9c825eb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java @@ -23,7 +23,7 @@ /** * A reader that reads a sequence of integers. * */ -class RunLengthIntegerReader { +class RunLengthIntegerReader implements IntegerReader { private final InStream input; private final boolean signed; private final long[] literals = @@ -71,11 +71,11 @@ private void readValues() throws IOException { } } - boolean hasNext() throws IOException { + public boolean hasNext() throws IOException { return used != numLiterals || input.available() > 0; } - long next() throws IOException { + public long next() throws IOException { long result; if (used == numLiterals) { readValues(); @@ -88,7 +88,7 @@ long next() throws IOException { return result; } - void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index) throws IOException { input.seek(index); int consumed = (int) index.getNext(); if (consumed != 0) { @@ -104,7 +104,7 @@ void seek(PositionProvider index) throws IOException { } } - void skip(long numValues) throws IOException { + public void skip(long numValues) throws IOException { while (numValues > 0) { if (used == numLiterals) { readValues(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java new file mode 100644 index 0000000..0a8f0e9 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.EOFException; +import java.io.IOException; + +import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType; + +/** + * A reader that reads a sequence of light weight compressed integers. Refer + * {@link RunLengthIntegerWriterV2} for description of various lightweight + * compression techniques. + */ +class RunLengthIntegerReaderV2 implements IntegerReader { + private final InStream input; + private final boolean signed; + private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE]; + private int numLiterals = 0; + private int used = 0; + + RunLengthIntegerReaderV2(InStream input, boolean signed) throws IOException { + this.input = input; + this.signed = signed; + } + + private void readValues() throws IOException { + // read the first 2 bits and determine the encoding type + int firstByte = input.read(); + if (firstByte < 0) { + throw new EOFException("Read past end of RLE integer from " + input); + } else { + int enc = (firstByte >>> 6) & 0x03; + if (EncodingType.SHORT_REPEAT.ordinal() == enc) { + readShortRepeatValues(firstByte); + } else if (EncodingType.DIRECT.ordinal() == enc) { + readDirectValues(firstByte); + } else if (EncodingType.PATCHED_BASE.ordinal() == enc) { + readPatchedBaseValues(firstByte); + } else { + readDeltaValues(firstByte); + } + } + } + + private void readDeltaValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fb = (firstByte >>> 1) & 0x1f; + if (fb != 0) { + fb = SerializationUtils.decodeBitWidth(fb); + } + + // extract the blob run length + int len = (firstByte & 0x01) << 8; + len |= input.read(); + + // read the first value stored as vint + long firstVal = 0; + if (signed) { + firstVal = SerializationUtils.readVslong(input); + } else { + firstVal = SerializationUtils.readVulong(input); + } + + // store first value to result buffer + long prevVal = firstVal; + literals[numLiterals++] = firstVal; + + // if fixed bits is 0 then all values have fixed delta + if (fb == 0) { + // read the fixed delta value stored as vint (deltas can be negative even + // if all number are positive) + long fd = SerializationUtils.readVslong(input); + + // add fixed deltas to adjacent values + for (int i = 0; i < len; i++) { + literals[numLiterals++] = literals[numLiterals - 2] + fd; + } + } else { + long deltaBase = SerializationUtils.readVslong(input); + // add delta base and first value + literals[numLiterals++] = firstVal + deltaBase; + prevVal = literals[numLiterals - 1]; + len -= 1; + + // write the unpacked values, add it to previous value and store final + // value to result buffer. if the delta base value is negative then it + // is a decreasing sequence else an increasing sequence + SerializationUtils.readInts(literals, numLiterals, len, fb, input); + while (len > 0) { + if (deltaBase < 0) { + literals[numLiterals] = prevVal - literals[numLiterals]; + } else { + literals[numLiterals] = prevVal + literals[numLiterals]; + } + prevVal = literals[numLiterals]; + len--; + numLiterals++; + } + } + } + + private void readPatchedBaseValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fbo = (firstByte >>> 1) & 0x1f; + int fb = SerializationUtils.decodeBitWidth(fbo); + + // extract the run length of data blob + int len = (firstByte & 0x01) << 8; + len |= input.read(); + // runs are always one off + len += 1; + + // extract the number of bytes occupied by base + int thirdByte = input.read(); + int bw = (thirdByte >>> 5) & 0x07; + // base width is one off + bw += 1; + + // extract patch width + int pwo = thirdByte & 0x1f; + int pw = SerializationUtils.decodeBitWidth(pwo); + + // read fourth byte and extract patch gap width + int fourthByte = input.read(); + int pgw = (fourthByte >>> 5) & 0x07; + // patch gap width is one off + pgw += 1; + + // extract the length of the patch list + int pl = fourthByte & 0x1f; + + // read the next base width number of bytes to extract base value + long base = SerializationUtils.bytesToLongBE(input, bw); + base = SerializationUtils.zigzagDecode(base); + + // unpack the data blob + long[] unpacked = new long[len]; + SerializationUtils.readInts(unpacked, 0, len, fb, input); + + // unpack the patch blob + long[] unpackedPatch = new long[pl]; + SerializationUtils.readInts(unpackedPatch, 0, pl, pw + pgw, input); + + // apply the patch directly when decoding the packed data + int patchIdx = 0; + long currGap = 0; + long currPatch = 0; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + long actualGap = 0; + + // special case: gap is >255 then patch value will be 0. + // if gap is <=255 then patch value cannot be 0 + while (currGap == 255 && currPatch == 0) { + actualGap += 255; + patchIdx++; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + } + // add the left over gap + actualGap += currGap; + + // unpack data blob, patch it (if required), add base to get final result + for (int i = 0; i < unpacked.length; i++) { + if (i == actualGap) { + // apply patch + long patchedVal = unpacked[i] | (currPatch << fb); + + // add base to patched value and zigzag decode it + literals[numLiterals++] = base + patchedVal; + + // increment the patch to point to next entry in patch list + patchIdx++; + + if (patchIdx < pl) { + // read the next gap and patch + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + actualGap = 0; + + // special case: gap is >255 then patch will be 0. if gap is + // <=255 then patch cannot be 0 + while (currGap == 255 && currPatch == 0) { + actualGap += 255; + patchIdx++; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + } + // add the left over gap + actualGap += currGap; + + // next gap is relative to the current gap + actualGap += i; + } + } else { + // no patching required. add base to unpacked value to get final value + literals[numLiterals++] = base + unpacked[i]; + } + } + + } + + private void readDirectValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fbo = (firstByte >>> 1) & 0x1f; + int fb = SerializationUtils.decodeBitWidth(fbo); + + // extract the run length + int len = (firstByte & 0x01) << 8; + len |= input.read(); + // runs are one off + len += 1; + + // write the unpacked values and zigzag decode to result buffer + SerializationUtils.readInts(literals, numLiterals, len, fb, input); + if (signed) { + for (int i = 0; i < len; i++) { + literals[numLiterals] = SerializationUtils.zigzagDecode(literals[numLiterals]); + numLiterals++; + } + } else { + numLiterals += len; + } + } + + private void readShortRepeatValues(int firstByte) throws IOException { + + // read the number of bytes occupied by the value + int size = (firstByte >>> 3) & 0x07; + // #bytes are one off + size += 1; + + // read the run length + int len = firstByte & 0x07; + // run lengths values are stored only after MIN_REPEAT value is met + len += RunLengthIntegerWriterV2.MIN_REPEAT; + + // read the repeated value which is store using fixed bytes + long val = SerializationUtils.bytesToLongBE(input, size); + + if (signed) { + val = SerializationUtils.zigzagDecode(val); + } + + // repeat the value for length times + for (int i = 0; i < len; i++) { + literals[numLiterals++] = val; + } + } + + public boolean hasNext() throws IOException { + return used != numLiterals || input.available() > 0; + } + + public long next() throws IOException { + long result; + if (used == numLiterals) { + numLiterals = 0; + used = 0; + readValues(); + } + result = literals[used++]; + return result; + } + + public void seek(PositionProvider index) throws IOException { + input.seek(index); + int consumed = (int) index.getNext(); + if (consumed != 0) { + // a loop is required for cases where we break the run into two + // parts + while (consumed > 0) { + numLiterals = 0; + readValues(); + used = consumed; + consumed -= numLiterals; + } + } else { + used = 0; + numLiterals = 0; + } + } + + public void skip(long numValues) throws IOException { + while (numValues > 0) { + if (used == numLiterals) { + numLiterals = 0; + used = 0; + readValues(); + } + long consume = Math.min(numValues, numLiterals - used); + used += consume; + numValues -= consume; + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java index aaca0a1..0497a56 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java @@ -25,7 +25,7 @@ * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128 * literal vint values follow. */ -class RunLengthIntegerWriter { +class RunLengthIntegerWriter implements IntegerWriter { static final int MIN_REPEAT_SIZE = 3; static final int MAX_DELTA = 127; static final int MIN_DELTA = -128; @@ -71,12 +71,12 @@ private void writeValues() throws IOException { } } - void flush() throws IOException { + public void flush() throws IOException { writeValues(); output.flush(); } - void write(long value) throws IOException { + public void write(long value) throws IOException { if (numLiterals == 0) { literals[numLiterals++] = value; tailRunLength = 1; @@ -130,8 +130,9 @@ void write(long value) throws IOException { } } - void getPosition(PositionRecorder recorder) throws IOException { - output.getPosition(recorder); - recorder.addPosition(numLiterals); + public void getPosition(PositionRecorder recorder) throws IOException { + output.getPosition(recorder); + recorder.addPosition(numLiterals); } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java new file mode 100644 index 0000000..bbcd887 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java @@ -0,0 +1,808 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; + +/** + * A writer that performs light weight compression over sequence of integers. + *

+ * There are four types of lightweight integer compression + *

    + *
  • SHORT_REPEAT
  • + *
  • DIRECT
  • + *
  • PATCHED_BASE
  • + *
  • DELTA
  • + *
+ *

+ * The description and format for these types are as below: + *

+ * SHORT_REPEAT: Used for short repeated integer sequences. + *

    + *
  • 1 byte header + *
      + *
    • 2 bits for encoding type
    • + *
    • 3 bits for bytes required for repeating value
    • + *
    • 3 bits for repeat count (MIN_REPEAT + run length)
    • + *
    + *
  • + *
  • Blob - repeat value (fixed bytes)
  • + *
+ *

+ *

+ * DIRECT: Used for random integer sequences whose number of bit requirement doesn't vary a + * lot. + *

    + *
  • 2 bytes header + *
      + * 1st byte + *
    • 2 bits for encoding type
    • + *
    • 5 bits for fixed bit width of values in blob
    • + *
    • 1 bit for storing MSB of run length
    • + *
    + *
      + * 2nd byte + *
    • 8 bits for lower run length bits
    • + *
    + *
  • + *
  • Blob - fixed width * run length bits long
  • + *
+ *

+ *

+ * PATCHED_BASE: Used for random integer sequences whose number of bit requirement varies + * beyond a threshold. + *

    + *
  • 4 bytes header + *
      + * 1st byte + *
    • 2 bits for encoding type
    • + *
    • 5 bits for fixed bit width of values in blob
    • + *
    • 1 bit for storing MSB of run length
    • + *
    + *
      + * 2nd byte + *
    • 8 bits for lower run length bits
    • + *
    + *
      + * 3rd byte + *
    • 3 bits for bytes required for base value
    • + *
    • 5 bits for patch width
    • + *
    + *
      + * 4th byte + *
    • 3 bits for patch gap width
    • + *
    • 5 bits for patch length
    • + *
    + *
  • + *
  • Base value - base width * 8 bits
  • + *
  • Data blob - fixed width * run length
  • + *
  • Patch blob - (patch width + patch gap width) * patch length
  • + *
+ *

+ *

+ * DELTA Used for monotonically increasing or decreasing sequences, sequences with fixed + * delta values or long repeated sequences. + *

    + *
  • 2 bytes header + *
      + * 1st byte + *
    • 2 bits for encoding type
    • + *
    • 5 bits for fixed bit width of values in blob
    • + *
    • 1 bit for storing MSB of run length
    • + *
    + *
      + * 2nd byte + *
    • 8 bits for lower run length bits
    • + *
    + *
  • + *
  • Base value - encoded as varint
  • + *
  • Delta base - encoded as varint
  • + *
  • Delta blob - only positive values. monotonicity is decided based on the + * the sign of delta base
  • + *
+ *

+ */ +class RunLengthIntegerWriterV2 implements IntegerWriter { + + public enum EncodingType { + SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA + } + + static final int MAX_SCOPE = 512; + static final int MIN_REPEAT = 3; + private static final int MAX_SHORT_REPEAT_LENGTH = 10; + private static final int MAX_SHORT_REPEAT_SIZE = 8; + private long prevDelta = 0; + private int fixedRunLength = 0; + private int variableRunLength = 0; + private long[] literals = new long[MAX_SCOPE]; + private final PositionedOutputStream output; + private final boolean signed; + private EncodingType encoding; + private int numLiterals; + private long[] zigzagLiterals; + private long[] baseRedLiterals; + private long[] adjDeltas; + private long fixedDelta; + private int zzBits90p; + private int zzBits100p; + private int brBits95p; + private int brBits100p; + private int bitsDeltaMax; + private int patchWidth; + private int patchGapWidth; + private int patchLength; + private long[] gapVsPatchList; + private long zzMin; + private boolean isFixedDelta; + + RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) { + this.output = output; + this.signed = signed; + clear(); + } + + private void writeValues() throws IOException { + if (numLiterals != 0) { + + if (encoding.equals(EncodingType.SHORT_REPEAT)) { + writeShortRepeatValues(); + } else if (encoding.equals(EncodingType.DIRECT)) { + writeDirectValues(); + } else if (encoding.equals(EncodingType.PATCHED_BASE)) { + writePatchedBaseValues(); + } else { + writeDeltaValues(); + } + + // clear all the variables + clear(); + } + } + + private void writeDeltaValues() throws IOException { + int len = 0; + int fb = bitsDeltaMax; + int efb = 0; + + if (isFixedDelta) { + // if the fixed delta is 0 then the sequence is counted as fixed + // run length else as variable run length + if (fixedRunLength > MIN_REPEAT) { + // ex. sequence: 2 2 2 2 2 2 2 2 + len = fixedRunLength - 1; + fixedRunLength = 0; + } else { + // ex. sequence: 4 6 8 10 12 14 16 + len = variableRunLength - 1; + variableRunLength = 0; + } + } else { + // fixed width 0 is used for fixed delta runs. sequence that require + // only 1 bit to encode will have an additional bit + if (fb == 1) { + fb = 2; + } + efb = SerializationUtils.encodeBitWidth(fb); + efb = efb << 1; + len = variableRunLength - 1; + variableRunLength = 0; + } + + // extract the 9th bit of run length + int tailBits = (len & 0x100) >>> 8; + + // create first byte of the header + int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + int headerSecondByte = len & 0xff; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + + // store the first value from zigzag literal array + if (signed) { + SerializationUtils.writeVslong(output, literals[0]); + } else { + SerializationUtils.writeVulong(output, literals[0]); + } + + // if delta is fixed then we don't need to store delta blob else store + // delta blob using fixed bit packing + if (isFixedDelta) { + // store the fixed delta using zigzag encoding. deltas can be negative + // even if all values are positive + SerializationUtils.writeVslong(output, fixedDelta); + } else { + // bit pack the delta blob + SerializationUtils.writeVslong(output, adjDeltas[0]); + SerializationUtils.writeInts(adjDeltas, 1, adjDeltas.length - 1, fb, output); + } + } + + private void writePatchedBaseValues() throws IOException { + + // write the number of fixed bits required in next 5 bits + int fb = brBits95p; + int efb = SerializationUtils.encodeBitWidth(fb) << 1; + + // adjust variable run length, they are one off + variableRunLength -= 1; + + // extract the 9th bit of run length + int tailBits = (variableRunLength & 0x100) >>> 8; + + // create first byte of the header + int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + int headerSecondByte = variableRunLength & 0xff; + + // find the number of bytes required for base and shift it by 5 bits + // to accommodate patch width + int baseWidth = SerializationUtils.findClosestNumBits(zzMin); + int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1; + int bb = (baseBytes - 1) << 5; + + // third byte contains 3 bits for number of bytes occupied by base + // and 5 bits for patchWidth + int headerThirdByte = bb | SerializationUtils.encodeBitWidth(patchWidth); + + // fourth byte contains 3 bits for page gap width and 5 bits for + // patch length + int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + output.write(headerThirdByte); + output.write(headerFourthByte); + + // write the base value using fixed bytes in big endian order + for (int i = baseBytes - 1; i >= 0; i--) { + byte b = (byte) ((zzMin >>> (i * 8)) & 0xff); + output.write(b); + } + + // bit packing the delta values and write each bytes + int closestFixedBits = SerializationUtils.getClosestFixedBits(brBits95p); + SerializationUtils.writeInts(baseRedLiterals, 0, baseRedLiterals.length, closestFixedBits, + output); + + // write the patch blob + closestFixedBits = SerializationUtils.getClosestFixedBits(patchGapWidth + + patchWidth); + SerializationUtils + .writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits, output); + + // reset run length + variableRunLength = 0; + } + + private int getOpcode() { + return encoding.ordinal() << 6; + } + + private void writeDirectValues() throws IOException { + + // write the number of fixed bits required in next 5 bits + int efb = SerializationUtils.encodeBitWidth(zzBits100p) << 1; + + // adjust variable run length + variableRunLength -= 1; + + // extract the 9th bit of run length + int tailBits = (variableRunLength & 0x100) >>> 8; + + // create first byte of the header + int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of + // runlength + int headerSecondByte = variableRunLength & 0xff; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + + // bit packing the delta values and write each bytes + SerializationUtils.writeInts(zigzagLiterals, 0, zigzagLiterals.length, zzBits100p, output); + + // reset run length + variableRunLength = 0; + } + + private void writeShortRepeatValues() throws IOException { + // get the value that is repeating, compute the bits and bytes required + long repeatVal = 0; + if (signed) { + repeatVal = SerializationUtils.zigzagEncode(literals[0]); + } else { + repeatVal = literals[0]; + } + + int numBitsRepeatVal = SerializationUtils.findClosestNumBits(repeatVal); + int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3 + : (numBitsRepeatVal >>> 3) + 1; + + // if the runs are long or too short and if the delta is non zero, then + // choose a different algorithm + if (fixedRunLength >= MIN_REPEAT + && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH + && numBytesRepeatVal <= MAX_SHORT_REPEAT_SIZE && prevDelta == 0) { + // write encoding type in top 2 bits + int header = getOpcode(); + + // write the number of bytes required for the value + header |= ((numBytesRepeatVal - 1) << 3); + + // write the run length + fixedRunLength -= MIN_REPEAT; + header |= fixedRunLength; + + // write the header + output.write(header); + + // write the payload (i.e. the repeat value) in big endian + for (int i = numBytesRepeatVal - 1; i >= 0; i--) { + int b = (int) ((repeatVal >>> (i * 8)) & 0xff); + output.write(b); + } + + fixedRunLength = 0; + } else { + determineEncoding(); + writeValues(); + } + } + + private void determineEncoding() { + // used for direct encoding + zigzagLiterals = new long[numLiterals]; + + // used for patched base encoding + baseRedLiterals = new long[numLiterals]; + + // used for delta encoding + adjDeltas = new long[numLiterals - 1]; + + int idx = 0; + + // for identifying monotonic sequences + boolean isIncreasing = false; + int increasingCount = 1; + boolean isDecreasing = false; + int decreasingCount = 1; + + // for identifying type of delta encoding + long currMin = literals[0]; + long currMax = literals[0]; + isFixedDelta = true; + long currDelta = 0; + + if (signed) { + zzMin = SerializationUtils.zigzagEncode(literals[0]); + } else { + zzMin = literals[0]; + } + long deltaMax = 0; + + // populate all variables to identify the encoding type + if (numLiterals >= 1) { + currDelta = literals[1] - literals[0]; + for (int i = 0; i < numLiterals; i++) { + if (i > 0 && literals[i] >= currMax) { + currMax = literals[i]; + increasingCount++; + } + + if (i > 0 && literals[i] <= currMin) { + currMin = literals[i]; + decreasingCount++; + } + + // if delta doesn't changes then mark it as fixed delta + if (i > 0 && isFixedDelta) { + if (literals[i] - literals[i - 1] != currDelta) { + isFixedDelta = false; + } + + fixedDelta = currDelta; + } + + // store the minimum value among zigzag encoded values. The min + // value (base) will be removed in patched base encoding + long zzEncVal = 0; + if (signed) { + zzEncVal = SerializationUtils.zigzagEncode(literals[i]); + } else { + zzEncVal = literals[i]; + } + if (zzEncVal < zzMin) { + zzMin = zzEncVal; + } + zigzagLiterals[idx] = zzEncVal; + idx++; + + // max delta value is required for computing the fixed bits + // required for delta blob in delta encoding + if (i > 0) { + if (i == 1) { + // first value preserve the sign + adjDeltas[i - 1] = literals[i] - literals[i - 1]; + } else { + adjDeltas[i - 1] = Math.abs(literals[i] - literals[i - 1]); + if (adjDeltas[i - 1] > deltaMax) { + deltaMax = adjDeltas[i - 1]; + } + } + } + } + + // stores the number of bits required for packing delta blob in + // delta encoding + bitsDeltaMax = SerializationUtils.findClosestNumBits(deltaMax); + + // if decreasing count equals total number of literals then the + // sequence is monotonically decreasing + if (increasingCount == 1 && decreasingCount == numLiterals) { + isDecreasing = true; + } + + // if increasing count equals total number of literals then the + // sequence is monotonically increasing + if (decreasingCount == 1 && increasingCount == numLiterals) { + isIncreasing = true; + } + } + + // if the sequence is both increasing and decreasing then it is not + // monotonic + if (isDecreasing && isIncreasing) { + isDecreasing = false; + isIncreasing = false; + } + + // fixed delta condition + if (isIncreasing == false && isDecreasing == false && isFixedDelta == true) { + encoding = EncodingType.DELTA; + return; + } + + // monotonic condition + if (isIncreasing || isDecreasing) { + encoding = EncodingType.DELTA; + return; + } + + // percentile values are computed for the zigzag encoded values. if the + // number of bit requirement between 90th and 100th percentile varies + // beyond a threshold then we need to patch the values. if the variation + // is not significant then we can use direct or delta encoding + + double p = 0.9; + zzBits90p = SerializationUtils.percentileBits(zigzagLiterals, p); + + p = 1.0; + zzBits100p = SerializationUtils.percentileBits(zigzagLiterals, p); + + int diffBitsLH = zzBits100p - zzBits90p; + + // if the difference between 90th percentile and 100th percentile fixed + // bits is > 1 then we need patch the values + if (isIncreasing == false && isDecreasing == false && diffBitsLH > 1 + && isFixedDelta == false) { + // patching is done only on base reduced values. + // remove base from literals + for (int i = 0; i < zigzagLiterals.length; i++) { + baseRedLiterals[i] = literals[i] - currMin; + } + + // 95th percentile width is used to determine max allowed value + // after which patching will be done + p = 0.95; + brBits95p = SerializationUtils.percentileBits(baseRedLiterals, p); + + // 100th percentile is used to compute the max patch width + p = 1.0; + brBits100p = SerializationUtils.percentileBits(baseRedLiterals, p); + + // after base reducing the values, if the difference in bits between + // 95th percentile and 100th percentile value is zero then there + // is no point in patching the values, in which case we will + // fallback to DIRECT encoding. + // The decision to use patched base was based on zigzag values, but the + // actual patching is done on base reduced literals. + if ((brBits100p - brBits95p) != 0) { + encoding = EncodingType.PATCHED_BASE; + preparePatchedBlob(); + return; + } else { + encoding = EncodingType.DIRECT; + return; + } + } + + // if difference in bits between 95th percentile and 100th percentile is + // 0, then patch length will become 0. Hence we will fallback to direct + if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1 + && isFixedDelta == false) { + encoding = EncodingType.DIRECT; + return; + } + + // this should not happen + if (encoding == null) { + throw new RuntimeException("Integer encoding cannot be determined."); + } + } + + private void preparePatchedBlob() { + // mask will be max value beyond which patch will be generated + int mask = (1 << brBits95p) - 1; + + // since we are considering only 95 percentile, the size of gap and + // patch array can contain only be 5% values + patchLength = (int) Math.ceil((baseRedLiterals.length * 0.05)); + int[] gapList = new int[patchLength]; + long[] patchList = new long[patchLength]; + + // #bit for patch + patchWidth = brBits100p - brBits95p; + patchWidth = SerializationUtils.getClosestFixedBits(patchWidth); + + int gapIdx = 0; + int patchIdx = 0; + int prev = 0; + int gap = 0; + int maxGap = 0; + + for (int i = 0; i < baseRedLiterals.length; i++) { + // if value is above mask then create the patch and record the gap + if (baseRedLiterals[i] > mask) { + gap = i - prev; + if (gap > maxGap) { + maxGap = gap; + } + + // gaps are relative, so store the previous patched value + prev = i; + gapList[gapIdx++] = gap; + + // extract the most significant bits that are over mask + long patch = baseRedLiterals[i] >>> brBits95p; + patchList[patchIdx++] = patch; + + // strip off the MSB to enable safe bit packing + baseRedLiterals[i] &= mask; + } + } + + // adjust the patch length to number of entries in gap list + patchLength = gapIdx; + + // if the element to be patched is the first and only element then + // max gap will be 0, but to store the gap as 0 we need atleast 1 bit + if (maxGap == 0 && patchLength != 0) { + patchGapWidth = 1; + } else { + patchGapWidth = SerializationUtils.findClosestNumBits(maxGap); + } + + // special case: if the patch gap width is greater than 256, then + // we need 9 bits to encode the gap width. But we only have 3 bits in + // header to record the gap width. To deal with this case, we will save + // two entries in final patch list with following entries + // 256 gap width => 0 for patch value + // actual gap - 256 => actual patch value + if (patchGapWidth > 8) { + patchGapWidth = 8; + // for gap = 511, we need two additional entries in patch list + if (maxGap == 511) { + patchLength += 2; + } else { + patchLength += 1; + } + } + + // create gap vs patch list + gapIdx = 0; + patchIdx = 0; + gapVsPatchList = new long[patchLength]; + for (int i = 0; i < patchLength; i++) { + long g = gapList[gapIdx++]; + long p = patchList[patchIdx++]; + while (g > 255) { + gapVsPatchList[i++] = (255 << patchWidth) | 0; + g -= 255; + } + + // store patch value in LSBs and gap in MSBs + gapVsPatchList[i] = (g << patchWidth) | p; + } + } + + /** + * clears all the variables + */ + private void clear() { + numLiterals = 0; + encoding = null; + prevDelta = 0; + zigzagLiterals = null; + baseRedLiterals = null; + adjDeltas = null; + fixedDelta = 0; + zzBits90p = 0; + zzBits100p = 0; + brBits95p = 0; + brBits100p = 0; + bitsDeltaMax = 0; + patchGapWidth = 0; + patchLength = 0; + patchWidth = 0; + gapVsPatchList = null; + zzMin = 0; + isFixedDelta = false; + } + + public void flush() throws IOException { + // if only one element is left in buffer then use short repeat encoding + if (numLiterals == 1) { + encoding = EncodingType.SHORT_REPEAT; + fixedRunLength = 1; + writeValues(); + } + + // if variable runs are not 0 then determine the delta encoding to use + // and flush out the buffer + if (variableRunLength != 0 && numLiterals != 0) { + determineEncoding(); + writeValues(); + } + + // if fixed runs are not 0 then flush out the buffer + if (fixedRunLength != 0 && numLiterals != 0) { + if (fixedRunLength < MIN_REPEAT) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + determineEncoding(); + writeValues(); + } else { + encoding = EncodingType.SHORT_REPEAT; + writeValues(); + } + } + + output.flush(); + } + + public void write(long val) throws IOException { + if (numLiterals == 0) { + initializeLiterals(val); + } else { + if (numLiterals == 1) { + prevDelta = val - literals[0]; + literals[numLiterals++] = val; + // if both values are same count as fixed run else variable run + if (val == literals[0]) { + fixedRunLength = 2; + variableRunLength = 0; + } else { + fixedRunLength = 0; + variableRunLength = 2; + } + } else { + long currentDelta = val - literals[numLiterals - 1]; + if (prevDelta == 0 && currentDelta == 0) { + // fixed delta run + + literals[numLiterals++] = val; + + // if variable run is non-zero then we are seeing repeating + // values at the end of variable run in which case keep + // updating variable and fixed runs + if (variableRunLength > 0) { + fixedRunLength = 2; + } + fixedRunLength += 1; + + // if fixed run met the minimum condition and if variable + // run is non-zero then flush the variable run and shift the + // tail fixed runs to start of the buffer + if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) { + numLiterals -= MIN_REPEAT; + variableRunLength -= MIN_REPEAT - 1; + // copy the tail fixed runs + long[] tailVals = new long[MIN_REPEAT]; + System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT); + + // determine variable encoding and flush values + determineEncoding(); + writeValues(); + + // shift tail fixed runs to beginning of the buffer + for (long l : tailVals) { + literals[numLiterals++] = l; + } + } + + // if fixed runs reached max repeat length then write values + if (fixedRunLength == MAX_SCOPE) { + determineEncoding(); + writeValues(); + } + } else { + // variable delta run + + // if fixed run length is non-zero and if satisfies the + // short repeat algorithm conditions then write the values + // as short repeats else determine the appropriate algorithm + // to persist the values + if (fixedRunLength >= MIN_REPEAT) { + if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { + encoding = EncodingType.SHORT_REPEAT; + writeValues(); + } else { + determineEncoding(); + writeValues(); + } + } + + // if fixed run length is 0 && fixedRunLength < MIN_REPEAT) { + if (val != literals[numLiterals - 1]) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + } + } + + // after writing values re-initialize the variables + if (numLiterals == 0) { + initializeLiterals(val); + } else { + // keep updating variable run lengths + prevDelta = val - literals[numLiterals - 1]; + literals[numLiterals++] = val; + variableRunLength += 1; + + // if variable run length reach the max scope, write it + if (variableRunLength == MAX_SCOPE) { + determineEncoding(); + writeValues(); + } + } + } + } + } + } + + private void initializeLiterals(long val) { + literals[numLiterals++] = val; + fixedRunLength = 1; + variableRunLength = 1; + } + + public void getPosition(PositionRecorder recorder) throws IOException { + output.getPosition(recorder); + recorder.addPosition(numLiterals); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java index 67762b5..5dc8263 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java @@ -26,8 +26,16 @@ final class SerializationUtils { + enum FixedBitSizes { + ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE, + THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN, TWENTY, + TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX, TWENTYEIGHT, THIRTY, + THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR; + } + // unused - private SerializationUtils() {} + private SerializationUtils() { + } static void writeVulong(OutputStream output, long value) throws IOException { while (true) { @@ -42,7 +50,7 @@ static void writeVulong(OutputStream output, long value) throws IOException { } static void writeVslong(OutputStream output, long value) throws IOException { - writeVulong(output, (value << 1) ^ (value >> 63)); + writeVulong(output, zigzagEncode(value)); } @@ -63,12 +71,12 @@ static long readVulong(InputStream in) throws IOException { static long readVslong(InputStream in) throws IOException { long result = readVulong(in); - return (result >>> 1) ^ -(result & 1); + return zigzagDecode(result); } static float readFloat(InputStream in) throws IOException { int ser = in.read() | (in.read() << 8) | (in.read() << 16) | - (in.read() << 24); + (in.read() << 24); return Float.intBitsToFloat(ser); } @@ -81,19 +89,19 @@ static void writeFloat(OutputStream output, float value) throws IOException { } static double readDouble(InputStream in) throws IOException { - long ser = (long) in.read() | - ((long) in.read() << 8) | - ((long) in.read() << 16) | - ((long) in.read() << 24) | - ((long) in.read() << 32) | - ((long) in.read() << 40) | - ((long) in.read() << 48) | - ((long) in.read() << 56); + long ser = (long) in.read() | + ((long) in.read() << 8) | + ((long) in.read() << 16) | + ((long) in.read() << 24) | + ((long) in.read() << 32) | + ((long) in.read() << 40) | + ((long) in.read() << 48) | + ((long) in.read() << 56); return Double.longBitsToDouble(ser); } static void writeDouble(OutputStream output, - double value) throws IOException { + double value) throws IOException { long ser = Double.doubleToLongBits(value); output.write(((int) ser) & 0xff); output.write(((int) (ser >> 8)) & 0xff); @@ -107,19 +115,22 @@ static void writeDouble(OutputStream output, /** * Write the arbitrarily sized signed BigInteger in vint format. - * + * * Signed integers are encoded using the low bit as the sign bit using zigzag * encoding. - * + * * Each byte uses the low 7 bits for data and the high bit for stop/continue. - * + * * Bytes are stored LSB first. - * @param output the stream to write to - * @param value the value to output + * + * @param output + * the stream to write to + * @param value + * the value to output * @throws IOException */ static void writeBigInteger(OutputStream output, - BigInteger value) throws IOException { + BigInteger value) throws IOException { // encode the signed number as a positive integer value = value.shiftLeft(1); int sign = value.signum(); @@ -132,7 +143,7 @@ static void writeBigInteger(OutputStream output, long lowBits = value.longValue() & 0x7fffffffffffffffL; length -= 63; // write out the next 63 bits worth of data - for(int i=0; i < 9; ++i) { + for (int i = 0; i < 9; ++i) { // if this is the last byte, leave the high bit off if (length <= 0 && (lowBits & ~0x7f) == 0) { output.write((byte) lowBits); @@ -148,7 +159,9 @@ static void writeBigInteger(OutputStream output, /** * Read the signed arbitrary sized BigInteger BigInteger in vint format - * @param input the stream to read from + * + * @param input + * the stream to read from * @return the read BigInteger * @throws IOException */ @@ -169,12 +182,12 @@ static BigInteger readBigInteger(InputStream input) throws IOException { result = BigInteger.valueOf(work); work = 0; } else if (offset % 63 == 0) { - result = result.or(BigInteger.valueOf(work).shiftLeft(offset-63)); + result = result.or(BigInteger.valueOf(work).shiftLeft(offset - 63)); work = 0; } } while (b >= 0x80); if (work != 0) { - result = result.or(BigInteger.valueOf(work).shiftLeft((offset/63)*63)); + result = result.or(BigInteger.valueOf(work).shiftLeft((offset / 63) * 63)); } // convert back to a signed number boolean isNegative = result.testBit(0); @@ -185,4 +198,299 @@ static BigInteger readBigInteger(InputStream input) throws IOException { result = result.shiftRight(1); return result; } + + + /** + * Count the number of bits required to encode the given value + * + * @param value + * @return bits required to store value + */ + static int findClosestNumBits(long value) { + int count = 0; + while (value > 0) { + count++; + value = value >>> 1; + } + return getClosestFixedBits(count); + } + + /** + * zigzag encode the given value + * + * @param val + * @return zigzag encoded value + */ + static long zigzagEncode(long val) { + return (val << 1) ^ (val >> 63); + } + + /** + * zigzag decode the given value + * + * @param val + * @return zizag decoded value + */ + static long zigzagDecode(long val) { + return (val >>> 1) ^ -(val & 1); + } + + /** + * Compute the bits required to represent pth percentile value + * + * @param data + * - array + * @param p + * - percentile value (>=0.0 to <=1.0) + * @return pth percentile bits + */ + static int percentileBits(long[] data, double p) { + if ((p > 1.0) || (p <= 0.0)) { + return -1; + } + + // histogram that store the encoded bit requirement for each values. + // maximum number of bits that can encoded is 32 (refer FixedBitSizes) + int[] hist = new int[32]; + + // compute the histogram + for (long l : data) { + int idx = encodeBitWidth(findClosestNumBits(l)); + hist[idx] += 1; + } + + int len = data.length; + int perLen = (int) (len * (1.0 - p)); + + // return the bits required by pth percentile length + for (int i = hist.length - 1; i >= 0; i--) { + perLen -= hist[i]; + if (perLen < 0) { + return decodeBitWidth(i); + } + } + + return 0; + } + + /** + * Read n bytes in big endian order and convert to long + * + * @param b + * - byte array + * @return long value + */ + static long bytesToLongBE(InStream input, int n) throws IOException { + long out = 0; + long val = 0; + while (n > 0) { + n--; + // store it in a long and then shift else integer overflow will occur + val = input.read(); + out |= (val << (n * 8)); + } + return out; + } + + /** + * Calculate the number of bytes required + * + * @param n + * - number of values + * @param numBits + * - bit width + * @return number of bytes required + */ + static int getTotalBytesRequired(int n, int numBits) { + return (n * numBits + 7) / 8; + } + + /** + * For a given fixed bit this function will return the closest available fixed + * bit + * + * @param n + * @return closest valid fixed bit + */ + static int getClosestFixedBits(int n) { + if (n == 0) { + return 1; + } + + if (n >= 1 && n <= 24) { + return n; + } else if (n > 24 && n <= 26) { + return 26; + } else if (n > 26 && n <= 28) { + return 28; + } else if (n > 28 && n <= 30) { + return 30; + } else if (n > 30 && n <= 32) { + return 32; + } else if (n > 32 && n <= 40) { + return 40; + } else if (n > 40 && n <= 48) { + return 48; + } else if (n > 48 && n <= 56) { + return 56; + } else { + return 64; + } + } + + /** + * Finds the closest available fixed bit width match and returns its encoded + * value (ordinal) + * + * @param n + * - fixed bit width to encode + * @return encoded fixed bit width + */ + static int encodeBitWidth(int n) { + n = getClosestFixedBits(n); + + if (n >= 1 && n <= 24) { + return n - 1; + } else if (n > 24 && n <= 26) { + return FixedBitSizes.TWENTYSIX.ordinal(); + } else if (n > 26 && n <= 28) { + return FixedBitSizes.TWENTYEIGHT.ordinal(); + } else if (n > 28 && n <= 30) { + return FixedBitSizes.THIRTY.ordinal(); + } else if (n > 30 && n <= 32) { + return FixedBitSizes.THIRTYTWO.ordinal(); + } else if (n > 32 && n <= 40) { + return FixedBitSizes.FORTY.ordinal(); + } else if (n > 40 && n <= 48) { + return FixedBitSizes.FORTYEIGHT.ordinal(); + } else if (n > 48 && n <= 56) { + return FixedBitSizes.FIFTYSIX.ordinal(); + } else { + return FixedBitSizes.SIXTYFOUR.ordinal(); + } + } + + /** + * Decodes the ordinal fixed bit value to actual fixed bit width value + * + * @param n + * - encoded fixed bit width + * @return decoded fixed bit width + */ + static int decodeBitWidth(int n) { + if (n >= FixedBitSizes.ONE.ordinal() && n <= FixedBitSizes.TWENTYFOUR.ordinal()) { + return n + 1; + } else if (n == FixedBitSizes.TWENTYSIX.ordinal()) { + return 26; + } else if (n == FixedBitSizes.TWENTYEIGHT.ordinal()) { + return 28; + } else if (n == FixedBitSizes.THIRTY.ordinal()) { + return 30; + } else if (n == FixedBitSizes.THIRTYTWO.ordinal()) { + return 32; + } else if (n == FixedBitSizes.FORTY.ordinal()) { + return 40; + } else if (n == FixedBitSizes.FORTYEIGHT.ordinal()) { + return 48; + } else if (n == FixedBitSizes.FIFTYSIX.ordinal()) { + return 56; + } else { + return 64; + } + } + + /** + * Bitpack and write the input values to underlying output stream + * + * @param input + * - values to write + * @param offset + * - offset + * @param len + * - length + * @param bitSize + * - bit width + * @param output + * - output stream + * @throws IOException + */ + static void writeInts(long[] input, int offset, int len, int bitSize, OutputStream output) + throws IOException { + if (input == null || input.length < 1 || offset < 0 || len < 1 || bitSize < 1) { + return; + } + + int bitsLeft = 8; + byte current = 0; + for (int i = offset; i < (offset + len); i++) { + long value = input[i]; + int bitsToWrite = bitSize; + while (bitsToWrite > bitsLeft) { + // add the bits to the bottom of the current word + current |= value >>> (bitsToWrite - bitsLeft); + // subtract out the bits we just added + bitsToWrite -= bitsLeft; + // zero out the bits above bitsToWrite + value &= (1L << bitsToWrite) - 1; + output.write(current); + current = 0; + bitsLeft = 8; + } + bitsLeft -= bitsToWrite; + current |= value << bitsLeft; + if (bitsLeft == 0) { + output.write(current); + current = 0; + bitsLeft = 8; + } + } + + // flush + if (bitsLeft != 8) { + output.write(current); + current = 0; + bitsLeft = 8; + } + } + + /** + * Read bitpacked integers from input stream + * + * @param buffer + * - input buffer + * @param offset + * - offset + * @param len + * - length + * @param bitSize + * - bit width + * @param input + * - input stream + * @throws IOException + */ + static void readInts(long[] buffer, int offset, int len, int bitSize, InStream input) + throws IOException { + int bitsLeft = 0; + int current = 0; + + for (int i = offset; i < (offset + len); i++) { + long result = 0; + int bitsLeftToRead = bitSize; + while (bitsLeftToRead > bitsLeft) { + result <<= bitsLeft; + result |= current & ((1 << bitsLeft) - 1); + bitsLeftToRead -= bitsLeft; + current = input.read(); + bitsLeft = 8; + } + + // handle the left over bits + if (bitsLeftToRead > 0) { + result <<= bitsLeftToRead; + bitsLeft -= bitsLeftToRead; + result |= (current >> bitsLeft) & ((1 << bitsLeftToRead) - 1); + } + buffer[i] = result; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 52defb9..64d9a9b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -364,6 +364,7 @@ public boolean isCompressed() { private final PositionedOutputStream rowIndexStream; private boolean foundNulls; private OutStream isPresentOutStream; + protected final boolean useDirectV2Encoding; /** * Create a tree writer. @@ -379,6 +380,9 @@ public boolean isCompressed() { this.isCompressed = streamFactory.isCompressed(); this.id = columnId; this.inspector = inspector; + // make direct v2 encoding as default + // FIXME: do we need to make this user configurable? + this.useDirectV2Encoding = true; if (nullable) { isPresentOutStream = streamFactory.createStream(id, OrcProto.Stream.Kind.PRESENT); @@ -493,6 +497,10 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, * @return the information about the encoding of this column */ OrcProto.ColumnEncoding getEncoding() { + if (useDirectV2Encoding) { + return OrcProto.ColumnEncoding.newBuilder().setKind( + OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } return OrcProto.ColumnEncoding.newBuilder().setKind( OrcProto.ColumnEncoding.Kind.DIRECT).build(); } @@ -618,7 +626,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { } private static class IntegerTreeWriter extends TreeWriter { - private final RunLengthIntegerWriter writer; + private final IntegerWriter writer; private final ShortObjectInspector shortInspector; private final IntObjectInspector intInspector; private final LongObjectInspector longInspector; @@ -630,7 +638,11 @@ void recordPosition(PositionRecorder recorder) throws IOException { super(columnId, inspector, writer, nullable); PositionedOutputStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); - this.writer = new RunLengthIntegerWriter(out, true); + if (super.useDirectV2Encoding) { + this.writer = new RunLengthIntegerWriterV2(out, true); + } else { + this.writer = new RunLengthIntegerWriter(out, true); + } if (inspector instanceof IntObjectInspector) { intInspector = (IntObjectInspector) inspector; shortInspector = null; @@ -759,8 +771,8 @@ void recordPosition(PositionRecorder recorder) throws IOException { private static class StringTreeWriter extends TreeWriter { private static final int INITIAL_DICTIONARY_SIZE = 4096; private final PositionedOutputStream stringOutput; - private final RunLengthIntegerWriter lengthOutput; - private final RunLengthIntegerWriter rowOutput; + private final IntegerWriter lengthOutput; + private final IntegerWriter rowOutput; private final StringRedBlackTree dictionary = new StringRedBlackTree(INITIAL_DICTIONARY_SIZE); private final DynamicIntArray rows = new DynamicIntArray(); @@ -776,10 +788,17 @@ void recordPosition(PositionRecorder recorder) throws IOException { super(columnId, inspector, writer, nullable); stringOutput = writer.createStream(id, OrcProto.Stream.Kind.DICTIONARY_DATA); - lengthOutput = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.LENGTH), false); - rowOutput = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.DATA), false); + if (super.useDirectV2Encoding == true) { + lengthOutput = new RunLengthIntegerWriterV2(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false); + rowOutput = new RunLengthIntegerWriterV2(writer.createStream(id, + OrcProto.Stream.Kind.DATA), false); + } else { + lengthOutput = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false); + rowOutput = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.DATA), false); + } recordPosition(rowIndexPosition); rowIndexValueCount.add(0L); buildIndex = writer.buildIndex(); @@ -881,7 +900,7 @@ long estimateMemory() { private static class BinaryTreeWriter extends TreeWriter { private final PositionedOutputStream stream; - private final RunLengthIntegerWriter length; + private final IntegerWriter length; BinaryTreeWriter(int columnId, ObjectInspector inspector, @@ -890,8 +909,13 @@ long estimateMemory() { super(columnId, inspector, writer, nullable); this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); - this.length = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.LENGTH), false); + if (super.useDirectV2Encoding) { + this.length = new RunLengthIntegerWriterV2(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false); + } else { + this.length = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false); + } recordPosition(rowIndexPosition); } @@ -928,18 +952,25 @@ void recordPosition(PositionRecorder recorder) throws IOException { Timestamp.valueOf("2015-01-01 00:00:00").getTime() / MILLIS_PER_SECOND; private static class TimestampTreeWriter extends TreeWriter { - private final RunLengthIntegerWriter seconds; - private final RunLengthIntegerWriter nanos; + private final IntegerWriter seconds; + private final IntegerWriter nanos; TimestampTreeWriter(int columnId, ObjectInspector inspector, StreamFactory writer, boolean nullable) throws IOException { super(columnId, inspector, writer, nullable); - this.seconds = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.DATA), true); - this.nanos = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.SECONDARY), false); + if (super.useDirectV2Encoding) { + this.seconds = new RunLengthIntegerWriterV2(writer.createStream(id, + OrcProto.Stream.Kind.DATA), true); + this.nanos = new RunLengthIntegerWriterV2(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), false); + } else { + this.seconds = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.DATA), true); + this.nanos = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), false); + } recordPosition(rowIndexPosition); } @@ -990,7 +1021,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { private static class DecimalTreeWriter extends TreeWriter { private final PositionedOutputStream valueStream; - private final RunLengthIntegerWriter scaleStream; + private final IntegerWriter scaleStream; DecimalTreeWriter(int columnId, ObjectInspector inspector, @@ -998,8 +1029,13 @@ void recordPosition(PositionRecorder recorder) throws IOException { boolean nullable) throws IOException { super(columnId, inspector, writer, nullable); valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA); - scaleStream = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.SECONDARY), true); + if (super.useDirectV2Encoding) { + this.scaleStream = new RunLengthIntegerWriterV2(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), true); + } else { + this.scaleStream = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), true); + } recordPosition(rowIndexPosition); } @@ -1076,7 +1112,7 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, } private static class ListTreeWriter extends TreeWriter { - private final RunLengthIntegerWriter lengths; + private final IntegerWriter lengths; ListTreeWriter(int columnId, ObjectInspector inspector, @@ -1088,9 +1124,13 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, childrenWriters[0] = createTreeWriter(listObjectInspector.getListElementObjectInspector(), writer, true); - lengths = - new RunLengthIntegerWriter(writer.createStream(columnId, + if (super.useDirectV2Encoding) { + lengths = new RunLengthIntegerWriterV2(writer.createStream(columnId, + OrcProto.Stream.Kind.LENGTH), false); + } else { + lengths = new RunLengthIntegerWriter(writer.createStream(columnId, OrcProto.Stream.Kind.LENGTH), false); + } recordPosition(rowIndexPosition); } @@ -1126,7 +1166,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { } private static class MapTreeWriter extends TreeWriter { - private final RunLengthIntegerWriter lengths; + private final IntegerWriter lengths; MapTreeWriter(int columnId, ObjectInspector inspector, @@ -1139,9 +1179,15 @@ void recordPosition(PositionRecorder recorder) throws IOException { createTreeWriter(insp.getMapKeyObjectInspector(), writer, true); childrenWriters[1] = createTreeWriter(insp.getMapValueObjectInspector(), writer, true); - lengths = - new RunLengthIntegerWriter(writer.createStream(columnId, - OrcProto.Stream.Kind.LENGTH), false); + if (super.useDirectV2Encoding) { + lengths = + new RunLengthIntegerWriterV2(writer.createStream(columnId, + OrcProto.Stream.Kind.LENGTH), false); + } else { + lengths = + new RunLengthIntegerWriter(writer.createStream(columnId, + OrcProto.Stream.Kind.LENGTH), false); + } recordPosition(rowIndexPosition); } diff --git ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto index d19bc06..994be70 100644 --- ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto +++ ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto @@ -66,6 +66,7 @@ message ColumnEncoding { enum Kind { DIRECT = 0; DICTIONARY = 1; + DIRECT_V2 = 2; } required Kind kind = 1; optional uint32 dictionarySize = 2; diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java index 4e2c59f..1ad8661 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java @@ -128,7 +128,7 @@ public void testSkips() throws Exception { BitFieldReader in = new BitFieldReader(InStream.create ("test", inBuf, null, 100), 1); for(int i=0; i < COUNT; i += 5) { - int x = (int) in.next(); + int x = in.next(); if (i < COUNT/2) { assertEquals(i & 1, x); } else { diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java new file mode 100644 index 0000000..39ab9df --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Random; + +import org.junit.Test; + +import com.google.common.primitives.Longs; + +public class TestBitPack { + + private static final int SIZE = 100; + private static Random rand = new Random(100); + + private long[] deltaEncode(long[] inp) { + long[] output = new long[inp.length]; + for (int i = 0; i < inp.length; i++) { + output[i] = SerializationUtils.zigzagEncode(inp[i]); + } + return output; + } + + private long nextLong(Random rng, long n) { + long bits, val; + do { + bits = (rng.nextLong() << 1) >>> 1; + val = bits % n; + } while (bits - val + (n - 1) < 0L); + return val; + } + + private void runTest(int numBits) throws IOException { + long[] inp = new long[SIZE]; + for (int i = 0; i < SIZE; i++) { + long val = 0; + if (numBits <= 32) { + if(numBits == 1) { + val = -1 * rand.nextInt(2); + } else { + val = rand.nextInt((int) Math.pow(2, numBits - 1)); + } + } else { + val = nextLong(rand, (long) Math.pow(2, numBits - 2)); + } + if (val % 2 == 0) { + val = -val; + } + inp[i] = val; + } + long[] deltaEncoded = deltaEncode(inp); + long minInput = Collections.min(Longs.asList(deltaEncoded)); + long maxInput = Collections.max(Longs.asList(deltaEncoded)); + long rangeInput = maxInput - minInput; + int fixedWidth = SerializationUtils.findClosestNumBits(rangeInput); + TestInStream.OutputCollector collect = new TestInStream.OutputCollector(); + OutStream output = new OutStream("test", SIZE, null, collect); + SerializationUtils.writeInts(deltaEncoded, 0, deltaEncoded.length, fixedWidth, output); + output.flush(); + ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); + collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); + inBuf.flip(); + long[] buff = new long[SIZE]; + SerializationUtils.readInts(buff, 0, SIZE, fixedWidth, InStream.create + ("test", inBuf, null, SIZE)); + for(int i=0; i= 0; --i) { + in.seek(positions[i]); + int x = (int) in.next(); + if (i < 1024) { + assertEquals(i/4, x); + } else if (i < 2048) { + assertEquals(2*i, x); + } else { + assertEquals(junk[i-2048], x); + } + } + } + + @Test + public void testUncompressedSeek() throws Exception { + runSeekTest(null); + } + + @Test + public void testCompressedSeek() throws Exception { + runSeekTest(new ZlibCodec()); + } + + @Test + public void testSkips() throws Exception { + TestInStream.OutputCollector collect = new TestInStream.OutputCollector(); + RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2( + new OutStream("test", 100, null, collect), true); + for(int i=0; i < 2048; ++i) { + if (i < 1024) { + out.write(i); + } else { + out.write(256 * i); + } + } + out.flush(); + ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); + collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); + inBuf.flip(); + RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create + ("test", inBuf, null, 100), true); + for(int i=0; i < 2048; i += 10) { + int x = (int) in.next(); + if (i < 1024) { + assertEquals(i, x); + } else { + assertEquals(256 * i, x); + } + if (i < 2038) { + in.skip(9); + } + in.skip(0); + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java new file mode 100644 index 0000000..f7ea0c6 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java @@ -0,0 +1,650 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import static junit.framework.Assert.assertEquals; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; + +public class TestNewIntegerEncoding { + + public static class Row { + Integer int1; + Long long1; + + public Row(int val, long l) { + this.int1 = val; + this.long1 = l; + } + } + + public List fetchData(String path) throws IOException { + List input = new ArrayList(); + FileInputStream stream = new FileInputStream(new File(path)); + try { + FileChannel fc = stream.getChannel(); + MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size()); + /* Instead of using default, pass in a decoder. */ + String[] lines = Charset.defaultCharset().decode(bb).toString() + .split("\n"); + for (String line : lines) { + long val = 0; + try { + val = Long.parseLong(line); + } catch (NumberFormatException e) { + // for now lets ignore (assign 0) + } + input.add(val); + } + } finally { + stream.close(); + } + return input; + } + + Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + + File.separator + "test" + File.separator + "tmp")); + + Configuration conf; + FileSystem fs; + Path testFilePath; + String resDir = "ql/src/test/resources"; + + @Rule + public TestName testCaseName = new TestName(); + + @Before + public void openFileSystem() throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + testFilePath = new Path(workDir, "TestOrcFile." + + testCaseName.getMethodName() + ".orc"); + fs.delete(testFilePath, false); + } + + @Test + public void testBasicRow() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Row.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + writer.addRow(new Row(111, 1111L)); + writer.addRow(new Row(111, 1111L)); + writer.addRow(new Row(111, 1111L)); + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(new IntWritable(111), ((OrcStruct) row).getFieldValue(0)); + assertEquals(new LongWritable(1111), ((OrcStruct) row).getFieldValue(1)); + } + } + + @Test + public void testBasic() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5, 6, + 7, 8, 9, 10, 1, 1, 1, 1, 1, 1, 10, 9, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, + 2, 5, 1, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, + 9, 2, 6, 3, 7, 1, 9, 2, 6, 2000, 2, 1, 1, 1, 1, 1, 3, 7, 1, 9, 2, 6, 1, + 1, 1, 1, 1}; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testBasicDelta1() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] {-500, -400, -350, -325, -310}; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testBasicDelta2() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] {-500, -600, -650, -675, -710}; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testBasicDelta3() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] {500, 400, 350, 325, 310}; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testBasicDelta4() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] {500, 600, 650, 675, 710}; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testIntegerMin() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add((long) Integer.MIN_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.ZLIB, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testIntegerMax() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add((long) Integer.MAX_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testLongMin() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add(Long.MIN_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testLongMax() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add(Long.MAX_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testRandomInt() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 100000; i++) { + input.add((long) rand.nextInt()); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testRandomLong() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 100000; i++) { + input.add(rand.nextLong()); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt0() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(0, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt1() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(1, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt255() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(255, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.ZLIB, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt256() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(256, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.ZLIB, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBase510() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(510, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.ZLIB, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBase511() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(511, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.ZLIB, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testSeek() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 100000; i++) { + input.add((long) rand.nextInt()); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 55555; + rows.seekToRow(idx); + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java index 4153a61..2f0dc17 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java @@ -18,6 +18,21 @@ package org.apache.hadoop.hive.ql.io.orc; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -51,20 +66,6 @@ import org.junit.Test; import org.junit.rules.TestName; -import java.io.File; -import java.io.IOException; -import java.math.BigInteger; -import java.nio.ByteBuffer; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; - -import static junit.framework.Assert.*; -import static junit.framework.Assert.assertEquals; - /** * Tests for the top level reader/streamFactory of ORC files. */ diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java index 9f989fd..2f7a7f1 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.ql.io.orc; import static junit.framework.Assert.assertEquals; diff --git ql/src/test/resources/orc-file-dump.out ql/src/test/resources/orc-file-dump.out index 8b88931..fd5e617 100644 --- ql/src/test/resources/orc-file-dump.out +++ ql/src/test/resources/orc-file-dump.out @@ -11,73 +11,73 @@ Statistics: Column 3: count: 21000 min: Darkness, max: worst Stripes: - Stripe: offset: 3 data: 69605 rows: 5000 tail: 72 index: 119 + Stripe: offset: 3 data: 52886 rows: 4143 tail: 72 index: 119 Stream: column 0 section ROW_INDEX start: 3 length 10 Stream: column 1 section ROW_INDEX start: 13 length 35 Stream: column 2 section ROW_INDEX start: 48 length 39 Stream: column 3 section ROW_INDEX start: 87 length 35 - Stream: column 1 section DATA start: 122 length 22605 - Stream: column 2 section DATA start: 22727 length 43426 - Stream: column 3 section DATA start: 66153 length 3403 - Stream: column 3 section LENGTH start: 69556 length 38 - Stream: column 3 section DICTIONARY_DATA start: 69594 length 133 - Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT + Stream: column 1 section DATA start: 122 length 16596 + Stream: column 2 section DATA start: 16718 length 33174 + Stream: column 3 section DATA start: 49892 length 2958 + Stream: column 3 section LENGTH start: 52850 length 25 + Stream: column 3 section DICTIONARY_DATA start: 52875 length 133 + Encoding column 0: DIRECT_V2 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 Encoding column 3: DICTIONARY[35] - Stripe: offset: 69799 data: 69584 rows: 5000 tail: 73 index: 118 - Stream: column 0 section ROW_INDEX start: 69799 length 10 - Stream: column 1 section ROW_INDEX start: 69809 length 34 - Stream: column 2 section ROW_INDEX start: 69843 length 39 - Stream: column 3 section ROW_INDEX start: 69882 length 35 - Stream: column 1 section DATA start: 69917 length 22597 - Stream: column 2 section DATA start: 92514 length 43439 - Stream: column 3 section DATA start: 135953 length 3377 - Stream: column 3 section LENGTH start: 139330 length 38 - Stream: column 3 section DICTIONARY_DATA start: 139368 length 133 - Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT + Stripe: offset: 53080 data: 63748 rows: 5000 tail: 73 index: 120 + Stream: column 0 section ROW_INDEX start: 53080 length 10 + Stream: column 1 section ROW_INDEX start: 53090 length 36 + Stream: column 2 section ROW_INDEX start: 53126 length 39 + Stream: column 3 section ROW_INDEX start: 53165 length 35 + Stream: column 1 section DATA start: 53200 length 20029 + Stream: column 2 section DATA start: 73229 length 40035 + Stream: column 3 section DATA start: 113264 length 3526 + Stream: column 3 section LENGTH start: 116790 length 25 + Stream: column 3 section DICTIONARY_DATA start: 116815 length 133 + Encoding column 0: DIRECT_V2 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 Encoding column 3: DICTIONARY[35] - Stripe: offset: 139574 data: 69570 rows: 5000 tail: 73 index: 120 - Stream: column 0 section ROW_INDEX start: 139574 length 10 - Stream: column 1 section ROW_INDEX start: 139584 length 36 - Stream: column 2 section ROW_INDEX start: 139620 length 39 - Stream: column 3 section ROW_INDEX start: 139659 length 35 - Stream: column 1 section DATA start: 139694 length 22594 - Stream: column 2 section DATA start: 162288 length 43415 - Stream: column 3 section DATA start: 205703 length 3390 - Stream: column 3 section LENGTH start: 209093 length 38 - Stream: column 3 section DICTIONARY_DATA start: 209131 length 133 - Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT + Stripe: offset: 117021 data: 63769 rows: 5000 tail: 73 index: 120 + Stream: column 0 section ROW_INDEX start: 117021 length 10 + Stream: column 1 section ROW_INDEX start: 117031 length 36 + Stream: column 2 section ROW_INDEX start: 117067 length 39 + Stream: column 3 section ROW_INDEX start: 117106 length 35 + Stream: column 1 section DATA start: 117141 length 20029 + Stream: column 2 section DATA start: 137170 length 40035 + Stream: column 3 section DATA start: 177205 length 3547 + Stream: column 3 section LENGTH start: 180752 length 25 + Stream: column 3 section DICTIONARY_DATA start: 180777 length 133 + Encoding column 0: DIRECT_V2 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 Encoding column 3: DICTIONARY[35] - Stripe: offset: 209337 data: 69551 rows: 5000 tail: 72 index: 119 - Stream: column 0 section ROW_INDEX start: 209337 length 10 - Stream: column 1 section ROW_INDEX start: 209347 length 35 - Stream: column 2 section ROW_INDEX start: 209382 length 39 - Stream: column 3 section ROW_INDEX start: 209421 length 35 - Stream: column 1 section DATA start: 209456 length 22575 - Stream: column 2 section DATA start: 232031 length 43426 - Stream: column 3 section DATA start: 275457 length 3379 - Stream: column 3 section LENGTH start: 278836 length 38 - Stream: column 3 section DICTIONARY_DATA start: 278874 length 133 - Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT - Encoding column 3: DICTIONARY[35] - Stripe: offset: 279079 data: 14096 rows: 1000 tail: 68 index: 120 - Stream: column 0 section ROW_INDEX start: 279079 length 10 - Stream: column 1 section ROW_INDEX start: 279089 length 36 - Stream: column 2 section ROW_INDEX start: 279125 length 39 - Stream: column 3 section ROW_INDEX start: 279164 length 35 - Stream: column 1 section DATA start: 279199 length 4529 - Stream: column 2 section DATA start: 283728 length 8690 - Stream: column 3 section DATA start: 292418 length 706 - Stream: column 3 section LENGTH start: 293124 length 38 - Stream: column 3 section DICTIONARY_DATA start: 293162 length 133 - Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT + Stripe: offset: 180983 data: 63785 rows: 5000 tail: 73 index: 120 + Stream: column 0 section ROW_INDEX start: 180983 length 10 + Stream: column 1 section ROW_INDEX start: 180993 length 36 + Stream: column 2 section ROW_INDEX start: 181029 length 39 + Stream: column 3 section ROW_INDEX start: 181068 length 35 + Stream: column 1 section DATA start: 181103 length 20029 + Stream: column 2 section DATA start: 201132 length 40035 + Stream: column 3 section DATA start: 241167 length 3563 + Stream: column 3 section LENGTH start: 244730 length 25 + Stream: column 3 section DICTIONARY_DATA start: 244755 length 133 + Encoding column 0: DIRECT_V2 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 Encoding column 3: DICTIONARY[35] + Stripe: offset: 244961 data: 23837 rows: 1857 tail: 68 index: 120 + Stream: column 0 section ROW_INDEX start: 244961 length 10 + Stream: column 1 section ROW_INDEX start: 244971 length 36 + Stream: column 2 section ROW_INDEX start: 245007 length 39 + Stream: column 3 section ROW_INDEX start: 245046 length 35 + Stream: column 1 section DATA start: 245081 length 7439 + Stream: column 2 section DATA start: 252520 length 14870 + Stream: column 3 section DATA start: 267390 length 1370 + Stream: column 3 section LENGTH start: 268760 length 25 + Stream: column 3 section DICTIONARY_DATA start: 268785 length 133 + Encoding column 0: DIRECT_V2 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 + Encoding column 3: DICTIONARY[35] \ No newline at end of file