diff --git ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
index f6acc00..cce1415 100644
--- ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
+++ ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
@@ -5157,10 +5157,12 @@ public ColumnEncoding getDefaultInstanceForType() {
implements com.google.protobuf.ProtocolMessageEnum {
DIRECT(0, 0),
DICTIONARY(1, 1),
+ DIRECT_V2(2, 2),
;
public static final int DIRECT_VALUE = 0;
public static final int DICTIONARY_VALUE = 1;
+ public static final int DIRECT_V2_VALUE = 2;
public final int getNumber() { return value; }
@@ -5169,6 +5171,7 @@ public static Kind valueOf(int value) {
switch (value) {
case 0: return DIRECT;
case 1: return DICTIONARY;
+ case 2: return DIRECT_V2;
default: return null;
}
}
@@ -5199,7 +5202,7 @@ public Kind findValueByNumber(int number) {
}
private static final Kind[] VALUES = {
- DIRECT, DICTIONARY,
+ DIRECT, DICTIONARY, DIRECT_V2,
};
public static Kind valueOf(
@@ -10568,41 +10571,42 @@ void setMagic(com.google.protobuf.ByteString value) {
"nd\022\016\n\006column\030\002 \001(\r\022\016\n\006length\030\003 \001(\004\"r\n\004Ki" +
"nd\022\013\n\007PRESENT\020\000\022\010\n\004DATA\020\001\022\n\n\006LENGTH\020\002\022\023\n" +
"\017DICTIONARY_DATA\020\003\022\024\n\020DICTIONARY_COUNT\020\004" +
- "\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\221\001\n\016Colum",
+ "\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\240\001\n\016Colum",
"nEncoding\022C\n\004kind\030\001 \002(\01625.org.apache.had" +
"oop.hive.ql.io.orc.ColumnEncoding.Kind\022\026" +
- "\n\016dictionarySize\030\002 \001(\r\"\"\n\004Kind\022\n\n\006DIRECT" +
- "\020\000\022\016\n\nDICTIONARY\020\001\"\214\001\n\014StripeFooter\0229\n\007s" +
- "treams\030\001 \003(\0132(.org.apache.hadoop.hive.ql" +
- ".io.orc.Stream\022A\n\007columns\030\002 \003(\01320.org.ap" +
- "ache.hadoop.hive.ql.io.orc.ColumnEncodin" +
- "g\"\236\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apache.ha" +
- "doop.hive.ql.io.orc.Type.Kind\022\024\n\010subtype" +
- "s\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\260\001\n\004Kind",
- "\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002\022\007\n\003IN" +
- "T\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE\020\006\022\n\n\006" +
- "STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020\t\022\010\n\004L" +
- "IST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNION\020\r\022\013\n" +
- "\007DECIMAL\020\016\"x\n\021StripeInformation\022\016\n\006offse" +
- "t\030\001 \001(\004\022\023\n\013indexLength\030\002 \001(\004\022\022\n\ndataLeng" +
- "th\030\003 \001(\004\022\024\n\014footerLength\030\004 \001(\004\022\024\n\014number" +
- "OfRows\030\005 \001(\004\"/\n\020UserMetadataItem\022\014\n\004name" +
- "\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n\006Footer\022\024\n\014head" +
- "erLength\030\001 \001(\004\022\025\n\rcontentLength\030\002 \001(\004\022D\n",
- "\007stripes\030\003 \003(\01323.org.apache.hadoop.hive." +
- "ql.io.orc.StripeInformation\0225\n\005types\030\004 \003" +
- "(\0132&.org.apache.hadoop.hive.ql.io.orc.Ty" +
- "pe\022D\n\010metadata\030\005 \003(\01322.org.apache.hadoop" +
- ".hive.ql.io.orc.UserMetadataItem\022\024\n\014numb" +
- "erOfRows\030\006 \001(\004\022F\n\nstatistics\030\007 \003(\01322.org" +
- ".apache.hadoop.hive.ql.io.orc.ColumnStat" +
- "istics\022\026\n\016rowIndexStride\030\010 \001(\r\"\255\001\n\nPostS" +
- "cript\022\024\n\014footerLength\030\001 \001(\004\022F\n\013compressi" +
- "on\030\002 \001(\01621.org.apache.hadoop.hive.ql.io.",
- "orc.CompressionKind\022\034\n\024compressionBlockS" +
- "ize\030\003 \001(\004\022\023\n\007version\030\004 \003(\rB\002\020\001\022\016\n\005magic\030" +
- "\300> \001(\t*:\n\017CompressionKind\022\010\n\004NONE\020\000\022\010\n\004Z" +
- "LIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003"
+ "\n\016dictionarySize\030\002 \001(\r\"1\n\004Kind\022\n\n\006DIRECT" +
+ "\020\000\022\016\n\nDICTIONARY\020\001\022\r\n\tDIRECT_V2\020\002\"\214\001\n\014St" +
+ "ripeFooter\0229\n\007streams\030\001 \003(\0132(.org.apache" +
+ ".hadoop.hive.ql.io.orc.Stream\022A\n\007columns" +
+ "\030\002 \003(\01320.org.apache.hadoop.hive.ql.io.or" +
+ "c.ColumnEncoding\"\236\002\n\004Type\0229\n\004kind\030\001 \002(\0162" +
+ "+.org.apache.hadoop.hive.ql.io.orc.Type." +
+ "Kind\022\024\n\010subtypes\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames",
+ "\030\003 \003(\t\"\260\001\n\004Kind\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t" +
+ "\n\005SHORT\020\002\022\007\n\003INT\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022" +
+ "\n\n\006DOUBLE\020\006\022\n\n\006STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tT" +
+ "IMESTAMP\020\t\022\010\n\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020" +
+ "\014\022\t\n\005UNION\020\r\022\013\n\007DECIMAL\020\016\"x\n\021StripeInfor" +
+ "mation\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002 " +
+ "\001(\004\022\022\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength\030" +
+ "\004 \001(\004\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMetad" +
+ "ataItem\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n" +
+ "\006Footer\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rcontent",
+ "Length\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apac" +
+ "he.hadoop.hive.ql.io.orc.StripeInformati" +
+ "on\0225\n\005types\030\004 \003(\0132&.org.apache.hadoop.hi" +
+ "ve.ql.io.orc.Type\022D\n\010metadata\030\005 \003(\01322.or" +
+ "g.apache.hadoop.hive.ql.io.orc.UserMetad" +
+ "ataItem\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatist" +
+ "ics\030\007 \003(\01322.org.apache.hadoop.hive.ql.io" +
+ ".orc.ColumnStatistics\022\026\n\016rowIndexStride\030" +
+ "\010 \001(\r\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 \001" +
+ "(\004\022F\n\013compression\030\002 \001(\01621.org.apache.had",
+ "oop.hive.ql.io.orc.CompressionKind\022\034\n\024co" +
+ "mpressionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003(" +
+ "\rB\002\020\001\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKind" +
+ "\022\010\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020" +
+ "\003"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
new file mode 100644
index 0000000..14d031c
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * Interface for reading integers.
+ */
+interface IntegerReader {
+
+ /**
+ * Seek to the position provided by index.
+ *
+ * @param index
+ * @throws IOException
+ */
+ void seek(PositionProvider index) throws IOException;
+
+ /**
+ * Skip number of specified rows.
+ *
+ * @param numValues
+ * @throws IOException
+ */
+ void skip(long numValues) throws IOException;
+
+ /**
+ * Check if there are any more values left.
+ *
+ * @return
+ * @throws IOException
+ */
+ boolean hasNext() throws IOException;
+
+ /**
+ * Return the next available value.
+ *
+ * @return
+ * @throws IOException
+ */
+ long next() throws IOException;
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
new file mode 100644
index 0000000..ab69d49
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * Interface for writing integers.
+ */
+interface IntegerWriter {
+
+ /**
+ * Get position from the stream.
+ *
+ * @param recorder
+ * @throws IOException
+ */
+ void getPosition(PositionRecorder recorder) throws IOException;
+
+ /**
+ * Write the integer value
+ *
+ * @param value
+ * @throws IOException
+ */
+ void write(long value) throws IOException;
+
+ /**
+ * Flush the buffer
+ *
+ * @throws IOException
+ */
+ void flush() throws IOException;
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 06103e3..a85f9cb 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -114,14 +114,18 @@ public long getNext() {
protected final int columnId;
private BitFieldReader present = null;
protected boolean valuePresent = false;
+ protected boolean isDirectV2;
TreeReader(Path path, int columnId) {
this.path = path;
this.columnId = columnId;
+ // FIXME: make it user configurable?
+ this.isDirectV2 = true;
}
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
- if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId + " of " + path);
}
@@ -130,6 +134,9 @@ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
void startStripe(Map streams,
List encoding
) throws IOException {
+ if(encoding.get(columnId).getKind() == OrcProto.ColumnEncoding.Kind.DIRECT) {
+ this.isDirectV2 = false;
+ }
checkEncoding(encoding.get(columnId));
InStream in = streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.PRESENT));
@@ -263,7 +270,7 @@ void skipRows(long items) throws IOException {
}
private static class ShortTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
ShortTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -276,7 +283,11 @@ void startStripe(Map streams,
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ if (super.isDirectV2) {
+ reader = new RunLengthIntegerReaderV2(streams.get(name), true);
+ } else {
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
}
@Override
@@ -307,7 +318,7 @@ void skipRows(long items) throws IOException {
}
private static class IntTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
IntTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -320,7 +331,11 @@ void startStripe(Map streams,
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ if (isDirectV2) {
+ reader = new RunLengthIntegerReaderV2(streams.get(name), true);
+ } else {
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
}
@Override
@@ -351,7 +366,7 @@ void skipRows(long items) throws IOException {
}
private static class LongTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
LongTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -364,7 +379,11 @@ void startStripe(Map streams,
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ if (isDirectV2) {
+ reader = new RunLengthIntegerReaderV2(streams.get(name), true);
+ } else {
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
}
@Override
@@ -489,7 +508,7 @@ void skipRows(long items) throws IOException {
private static class BinaryTreeReader extends TreeReader{
private InStream stream;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths = null;
BinaryTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -503,9 +522,15 @@ void startStripe(Map streams,
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
stream = streams.get(name);
- lengths = new RunLengthIntegerReader(streams.get(new
- StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
- false);
+ if (isDirectV2) {
+ lengths = new RunLengthIntegerReaderV2(streams.get(new
+ StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+ false);
+ } else {
+ lengths = new RunLengthIntegerReader(streams.get(new
+ StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+ false);
+ }
}
@Override
@@ -544,7 +569,7 @@ Object next(Object previous) throws IOException {
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long lengthToSkip = 0;
- for(int i=0; i < items; ++i) {
+ for (int i = 0; i < items; ++i) {
lengthToSkip += lengths.next();
}
stream.skip(lengthToSkip);
@@ -552,8 +577,8 @@ void skipRows(long items) throws IOException {
}
private static class TimestampTreeReader extends TreeReader{
- private RunLengthIntegerReader data;
- private RunLengthIntegerReader nanos;
+ private IntegerReader data = null;
+ private IntegerReader nanos = null;
TimestampTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -564,10 +589,17 @@ void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
- data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.DATA)), true);
- nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.SECONDARY)), false);
+ if (isDirectV2) {
+ data = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)), true);
+ nanos = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.SECONDARY)), false);
+ } else {
+ data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)), true);
+ nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.SECONDARY)), false);
+ }
}
@Override
@@ -587,9 +619,12 @@ Object next(Object previous) throws IOException {
} else {
result = (Timestamp) previous;
}
- long millis = (data.next() + WriterImpl.BASE_TIMESTAMP) *
+ long millis = 0;
+ int newNanos = 0;
+ millis = (data.next() + WriterImpl.BASE_TIMESTAMP) *
WriterImpl.MILLIS_PER_SECOND;
- int newNanos = parseNanos(nanos.next());
+ newNanos = parseNanos(nanos.next());
+
// fix the rounding when we divided by 1000.
if (millis >= 0) {
millis += newNanos / 1000000;
@@ -623,7 +658,7 @@ void skipRows(long items) throws IOException {
private static class DecimalTreeReader extends TreeReader{
private InStream valueStream;
- private RunLengthIntegerReader scaleStream;
+ private IntegerReader scaleStream = null;
DecimalTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -636,8 +671,13 @@ void startStripe(Map streams,
super.startStripe(streams, encodings);
valueStream = streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.DATA));
- scaleStream = new RunLengthIntegerReader(streams.get(
- new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
+ if (isDirectV2) {
+ scaleStream = new RunLengthIntegerReaderV2(streams.get(
+ new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
+ } else {
+ scaleStream = new RunLengthIntegerReader(streams.get(
+ new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
+ }
}
@Override
@@ -671,7 +711,7 @@ void skipRows(long items) throws IOException {
private DynamicByteArray dictionaryBuffer = null;
private int dictionarySize;
private int[] dictionaryOffsets;
- private RunLengthIntegerReader reader;
+ private IntegerReader reader = null;
StringTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -706,13 +746,18 @@ void startStripe(Map streams,
// read the lengths
name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
in = streams.get(name);
- RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false);
+ IntegerReader lenReader = null;
+ if(isDirectV2) {
+ lenReader = new RunLengthIntegerReaderV2(in, false);
+ } else {
+ lenReader = new RunLengthIntegerReader(in, false);
+ }
int offset = 0;
if (dictionaryOffsets == null ||
dictionaryOffsets.length < dictionarySize + 1) {
dictionaryOffsets = new int[dictionarySize + 1];
}
- for(int i=0; i < dictionarySize; ++i) {
+ for (int i = 0; i < dictionarySize; ++i) {
dictionaryOffsets[i] = offset;
offset += (int) lenReader.next();
}
@@ -721,7 +766,11 @@ void startStripe(Map streams,
// set up the row reader
name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), false);
+ if (isDirectV2) {
+ reader = new RunLengthIntegerReaderV2(streams.get(name), false);
+ } else {
+ reader = new RunLengthIntegerReader(streams.get(name), false);
+ }
}
@Override
@@ -735,7 +784,8 @@ Object next(Object previous) throws IOException {
super.next(previous);
Text result = null;
if (valuePresent) {
- int entry = (int) reader.next();
+ int entry = 0;
+ entry = (int) reader.next();
if (previous == null) {
result = new Text();
} else {
@@ -919,7 +969,7 @@ void skipRows(long items) throws IOException {
private static class ListTreeReader extends TreeReader {
private final TreeReader elementReader;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths = null;
ListTreeReader(Path path, int columnId,
List types,
@@ -972,8 +1022,13 @@ void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
- lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.LENGTH)), false);
+ if (isDirectV2) {
+ lengths = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ } else {
+ lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ }
if (elementReader != null) {
elementReader.startStripe(streams, encodings);
}
@@ -983,7 +1038,7 @@ void startStripe(Map streams,
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long childSkip = 0;
- for(long i=0; i < items; ++i) {
+ for (long i = 0; i < items; ++i) {
childSkip += lengths.next();
}
elementReader.skipRows(childSkip);
@@ -993,7 +1048,7 @@ void skipRows(long items) throws IOException {
private static class MapTreeReader extends TreeReader {
private final TreeReader keyReader;
private final TreeReader valueReader;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths = null;
MapTreeReader(Path path,
int columnId,
@@ -1050,8 +1105,13 @@ void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
- lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.LENGTH)), false);
+ if (isDirectV2) {
+ lengths = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ } else {
+ lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ }
if (keyReader != null) {
keyReader.startStripe(streams, encodings);
}
@@ -1064,7 +1124,7 @@ void startStripe(Map streams,
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long childSkip = 0;
- for(long i=0; i < items; ++i) {
+ for (long i = 0; i < items; ++i) {
childSkip += lengths.next();
}
keyReader.skipRows(childSkip);
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
index 2825c64..9c825eb 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
@@ -23,7 +23,7 @@
/**
* A reader that reads a sequence of integers.
* */
-class RunLengthIntegerReader {
+class RunLengthIntegerReader implements IntegerReader {
private final InStream input;
private final boolean signed;
private final long[] literals =
@@ -71,11 +71,11 @@ private void readValues() throws IOException {
}
}
- boolean hasNext() throws IOException {
+ public boolean hasNext() throws IOException {
return used != numLiterals || input.available() > 0;
}
- long next() throws IOException {
+ public long next() throws IOException {
long result;
if (used == numLiterals) {
readValues();
@@ -88,7 +88,7 @@ long next() throws IOException {
return result;
}
- void seek(PositionProvider index) throws IOException {
+ public void seek(PositionProvider index) throws IOException {
input.seek(index);
int consumed = (int) index.getNext();
if (consumed != 0) {
@@ -104,7 +104,7 @@ void seek(PositionProvider index) throws IOException {
}
}
- void skip(long numValues) throws IOException {
+ public void skip(long numValues) throws IOException {
while (numValues > 0) {
if (used == numLiterals) {
readValues();
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
new file mode 100644
index 0000000..0a8f0e9
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
+
+/**
+ * A reader that reads a sequence of light weight compressed integers. Refer
+ * {@link RunLengthIntegerWriterV2} for description of various lightweight
+ * compression techniques.
+ */
+class RunLengthIntegerReaderV2 implements IntegerReader {
+ private final InStream input;
+ private final boolean signed;
+ private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE];
+ private int numLiterals = 0;
+ private int used = 0;
+
+ RunLengthIntegerReaderV2(InStream input, boolean signed) throws IOException {
+ this.input = input;
+ this.signed = signed;
+ }
+
+ private void readValues() throws IOException {
+ // read the first 2 bits and determine the encoding type
+ int firstByte = input.read();
+ if (firstByte < 0) {
+ throw new EOFException("Read past end of RLE integer from " + input);
+ } else {
+ int enc = (firstByte >>> 6) & 0x03;
+ if (EncodingType.SHORT_REPEAT.ordinal() == enc) {
+ readShortRepeatValues(firstByte);
+ } else if (EncodingType.DIRECT.ordinal() == enc) {
+ readDirectValues(firstByte);
+ } else if (EncodingType.PATCHED_BASE.ordinal() == enc) {
+ readPatchedBaseValues(firstByte);
+ } else {
+ readDeltaValues(firstByte);
+ }
+ }
+ }
+
+ private void readDeltaValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fb = (firstByte >>> 1) & 0x1f;
+ if (fb != 0) {
+ fb = SerializationUtils.decodeBitWidth(fb);
+ }
+
+ // extract the blob run length
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+
+ // read the first value stored as vint
+ long firstVal = 0;
+ if (signed) {
+ firstVal = SerializationUtils.readVslong(input);
+ } else {
+ firstVal = SerializationUtils.readVulong(input);
+ }
+
+ // store first value to result buffer
+ long prevVal = firstVal;
+ literals[numLiterals++] = firstVal;
+
+ // if fixed bits is 0 then all values have fixed delta
+ if (fb == 0) {
+ // read the fixed delta value stored as vint (deltas can be negative even
+ // if all number are positive)
+ long fd = SerializationUtils.readVslong(input);
+
+ // add fixed deltas to adjacent values
+ for (int i = 0; i < len; i++) {
+ literals[numLiterals++] = literals[numLiterals - 2] + fd;
+ }
+ } else {
+ long deltaBase = SerializationUtils.readVslong(input);
+ // add delta base and first value
+ literals[numLiterals++] = firstVal + deltaBase;
+ prevVal = literals[numLiterals - 1];
+ len -= 1;
+
+ // write the unpacked values, add it to previous value and store final
+ // value to result buffer. if the delta base value is negative then it
+ // is a decreasing sequence else an increasing sequence
+ SerializationUtils.readInts(literals, numLiterals, len, fb, input);
+ while (len > 0) {
+ if (deltaBase < 0) {
+ literals[numLiterals] = prevVal - literals[numLiterals];
+ } else {
+ literals[numLiterals] = prevVal + literals[numLiterals];
+ }
+ prevVal = literals[numLiterals];
+ len--;
+ numLiterals++;
+ }
+ }
+ }
+
+ private void readPatchedBaseValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fbo = (firstByte >>> 1) & 0x1f;
+ int fb = SerializationUtils.decodeBitWidth(fbo);
+
+ // extract the run length of data blob
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+ // runs are always one off
+ len += 1;
+
+ // extract the number of bytes occupied by base
+ int thirdByte = input.read();
+ int bw = (thirdByte >>> 5) & 0x07;
+ // base width is one off
+ bw += 1;
+
+ // extract patch width
+ int pwo = thirdByte & 0x1f;
+ int pw = SerializationUtils.decodeBitWidth(pwo);
+
+ // read fourth byte and extract patch gap width
+ int fourthByte = input.read();
+ int pgw = (fourthByte >>> 5) & 0x07;
+ // patch gap width is one off
+ pgw += 1;
+
+ // extract the length of the patch list
+ int pl = fourthByte & 0x1f;
+
+ // read the next base width number of bytes to extract base value
+ long base = SerializationUtils.bytesToLongBE(input, bw);
+ base = SerializationUtils.zigzagDecode(base);
+
+ // unpack the data blob
+ long[] unpacked = new long[len];
+ SerializationUtils.readInts(unpacked, 0, len, fb, input);
+
+ // unpack the patch blob
+ long[] unpackedPatch = new long[pl];
+ SerializationUtils.readInts(unpackedPatch, 0, pl, pw + pgw, input);
+
+ // apply the patch directly when decoding the packed data
+ int patchIdx = 0;
+ long currGap = 0;
+ long currPatch = 0;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ long actualGap = 0;
+
+ // special case: gap is >255 then patch value will be 0.
+ // if gap is <=255 then patch value cannot be 0
+ while (currGap == 255 && currPatch == 0) {
+ actualGap += 255;
+ patchIdx++;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ }
+ // add the left over gap
+ actualGap += currGap;
+
+ // unpack data blob, patch it (if required), add base to get final result
+ for (int i = 0; i < unpacked.length; i++) {
+ if (i == actualGap) {
+ // apply patch
+ long patchedVal = unpacked[i] | (currPatch << fb);
+
+ // add base to patched value and zigzag decode it
+ literals[numLiterals++] = base + patchedVal;
+
+ // increment the patch to point to next entry in patch list
+ patchIdx++;
+
+ if (patchIdx < pl) {
+ // read the next gap and patch
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ actualGap = 0;
+
+ // special case: gap is >255 then patch will be 0. if gap is
+ // <=255 then patch cannot be 0
+ while (currGap == 255 && currPatch == 0) {
+ actualGap += 255;
+ patchIdx++;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ }
+ // add the left over gap
+ actualGap += currGap;
+
+ // next gap is relative to the current gap
+ actualGap += i;
+ }
+ } else {
+ // no patching required. add base to unpacked value to get final value
+ literals[numLiterals++] = base + unpacked[i];
+ }
+ }
+
+ }
+
+ private void readDirectValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fbo = (firstByte >>> 1) & 0x1f;
+ int fb = SerializationUtils.decodeBitWidth(fbo);
+
+ // extract the run length
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+ // runs are one off
+ len += 1;
+
+ // write the unpacked values and zigzag decode to result buffer
+ SerializationUtils.readInts(literals, numLiterals, len, fb, input);
+ if (signed) {
+ for (int i = 0; i < len; i++) {
+ literals[numLiterals] = SerializationUtils.zigzagDecode(literals[numLiterals]);
+ numLiterals++;
+ }
+ } else {
+ numLiterals += len;
+ }
+ }
+
+ private void readShortRepeatValues(int firstByte) throws IOException {
+
+ // read the number of bytes occupied by the value
+ int size = (firstByte >>> 3) & 0x07;
+ // #bytes are one off
+ size += 1;
+
+ // read the run length
+ int len = firstByte & 0x07;
+ // run lengths values are stored only after MIN_REPEAT value is met
+ len += RunLengthIntegerWriterV2.MIN_REPEAT;
+
+ // read the repeated value which is store using fixed bytes
+ long val = SerializationUtils.bytesToLongBE(input, size);
+
+ if (signed) {
+ val = SerializationUtils.zigzagDecode(val);
+ }
+
+ // repeat the value for length times
+ for (int i = 0; i < len; i++) {
+ literals[numLiterals++] = val;
+ }
+ }
+
+ public boolean hasNext() throws IOException {
+ return used != numLiterals || input.available() > 0;
+ }
+
+ public long next() throws IOException {
+ long result;
+ if (used == numLiterals) {
+ numLiterals = 0;
+ used = 0;
+ readValues();
+ }
+ result = literals[used++];
+ return result;
+ }
+
+ public void seek(PositionProvider index) throws IOException {
+ input.seek(index);
+ int consumed = (int) index.getNext();
+ if (consumed != 0) {
+ // a loop is required for cases where we break the run into two
+ // parts
+ while (consumed > 0) {
+ numLiterals = 0;
+ readValues();
+ used = consumed;
+ consumed -= numLiterals;
+ }
+ } else {
+ used = 0;
+ numLiterals = 0;
+ }
+ }
+
+ public void skip(long numValues) throws IOException {
+ while (numValues > 0) {
+ if (used == numLiterals) {
+ numLiterals = 0;
+ used = 0;
+ readValues();
+ }
+ long consume = Math.min(numValues, numLiterals - used);
+ used += consume;
+ numValues -= consume;
+ }
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
index aaca0a1..0497a56 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
@@ -25,7 +25,7 @@
* repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128
* literal vint values follow.
*/
-class RunLengthIntegerWriter {
+class RunLengthIntegerWriter implements IntegerWriter {
static final int MIN_REPEAT_SIZE = 3;
static final int MAX_DELTA = 127;
static final int MIN_DELTA = -128;
@@ -71,12 +71,12 @@ private void writeValues() throws IOException {
}
}
- void flush() throws IOException {
+ public void flush() throws IOException {
writeValues();
output.flush();
}
- void write(long value) throws IOException {
+ public void write(long value) throws IOException {
if (numLiterals == 0) {
literals[numLiterals++] = value;
tailRunLength = 1;
@@ -130,8 +130,9 @@ void write(long value) throws IOException {
}
}
- void getPosition(PositionRecorder recorder) throws IOException {
- output.getPosition(recorder);
- recorder.addPosition(numLiterals);
+ public void getPosition(PositionRecorder recorder) throws IOException {
+ output.getPosition(recorder);
+ recorder.addPosition(numLiterals);
}
+
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
new file mode 100644
index 0000000..bbcd887
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
@@ -0,0 +1,808 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * A writer that performs light weight compression over sequence of integers.
+ *
+ * There are four types of lightweight integer compression
+ *
+ * - SHORT_REPEAT
+ * - DIRECT
+ * - PATCHED_BASE
+ * - DELTA
+ *
+ *
+ * The description and format for these types are as below:
+ *
+ * SHORT_REPEAT: Used for short repeated integer sequences.
+ *
+ * - 1 byte header
+ *
+ * - 2 bits for encoding type
+ * - 3 bits for bytes required for repeating value
+ * - 3 bits for repeat count (MIN_REPEAT + run length)
+ *
+ *
+ * - Blob - repeat value (fixed bytes)
+ *
+ *
+ *
+ * DIRECT: Used for random integer sequences whose number of bit requirement doesn't vary a
+ * lot.
+ *
+ * - 2 bytes header
+ *
+ * 1st byte
+ * - 2 bits for encoding type
+ * - 5 bits for fixed bit width of values in blob
+ * - 1 bit for storing MSB of run length
+ *
+ *
+ * 2nd byte
+ * - 8 bits for lower run length bits
+ *
+ *
+ * - Blob - fixed width * run length bits long
+ *
+ *
+ *
+ * PATCHED_BASE: Used for random integer sequences whose number of bit requirement varies
+ * beyond a threshold.
+ *
+ * - 4 bytes header
+ *
+ * 1st byte
+ * - 2 bits for encoding type
+ * - 5 bits for fixed bit width of values in blob
+ * - 1 bit for storing MSB of run length
+ *
+ *
+ * 2nd byte
+ * - 8 bits for lower run length bits
+ *
+ *
+ * 3rd byte
+ * - 3 bits for bytes required for base value
+ * - 5 bits for patch width
+ *
+ *
+ * 4th byte
+ * - 3 bits for patch gap width
+ * - 5 bits for patch length
+ *
+ *
+ * - Base value - base width * 8 bits
+ * - Data blob - fixed width * run length
+ * - Patch blob - (patch width + patch gap width) * patch length
+ *
+ *
+ *
+ * DELTA Used for monotonically increasing or decreasing sequences, sequences with fixed
+ * delta values or long repeated sequences.
+ *
+ * - 2 bytes header
+ *
+ * 1st byte
+ * - 2 bits for encoding type
+ * - 5 bits for fixed bit width of values in blob
+ * - 1 bit for storing MSB of run length
+ *
+ *
+ * 2nd byte
+ * - 8 bits for lower run length bits
+ *
+ *
+ * - Base value - encoded as varint
+ * - Delta base - encoded as varint
+ * - Delta blob - only positive values. monotonicity is decided based on the
+ * the sign of delta base
+ *
+ *
+ */
+class RunLengthIntegerWriterV2 implements IntegerWriter {
+
+ public enum EncodingType {
+ SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA
+ }
+
+ static final int MAX_SCOPE = 512;
+ static final int MIN_REPEAT = 3;
+ private static final int MAX_SHORT_REPEAT_LENGTH = 10;
+ private static final int MAX_SHORT_REPEAT_SIZE = 8;
+ private long prevDelta = 0;
+ private int fixedRunLength = 0;
+ private int variableRunLength = 0;
+ private long[] literals = new long[MAX_SCOPE];
+ private final PositionedOutputStream output;
+ private final boolean signed;
+ private EncodingType encoding;
+ private int numLiterals;
+ private long[] zigzagLiterals;
+ private long[] baseRedLiterals;
+ private long[] adjDeltas;
+ private long fixedDelta;
+ private int zzBits90p;
+ private int zzBits100p;
+ private int brBits95p;
+ private int brBits100p;
+ private int bitsDeltaMax;
+ private int patchWidth;
+ private int patchGapWidth;
+ private int patchLength;
+ private long[] gapVsPatchList;
+ private long zzMin;
+ private boolean isFixedDelta;
+
+ RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) {
+ this.output = output;
+ this.signed = signed;
+ clear();
+ }
+
+ private void writeValues() throws IOException {
+ if (numLiterals != 0) {
+
+ if (encoding.equals(EncodingType.SHORT_REPEAT)) {
+ writeShortRepeatValues();
+ } else if (encoding.equals(EncodingType.DIRECT)) {
+ writeDirectValues();
+ } else if (encoding.equals(EncodingType.PATCHED_BASE)) {
+ writePatchedBaseValues();
+ } else {
+ writeDeltaValues();
+ }
+
+ // clear all the variables
+ clear();
+ }
+ }
+
+ private void writeDeltaValues() throws IOException {
+ int len = 0;
+ int fb = bitsDeltaMax;
+ int efb = 0;
+
+ if (isFixedDelta) {
+ // if the fixed delta is 0 then the sequence is counted as fixed
+ // run length else as variable run length
+ if (fixedRunLength > MIN_REPEAT) {
+ // ex. sequence: 2 2 2 2 2 2 2 2
+ len = fixedRunLength - 1;
+ fixedRunLength = 0;
+ } else {
+ // ex. sequence: 4 6 8 10 12 14 16
+ len = variableRunLength - 1;
+ variableRunLength = 0;
+ }
+ } else {
+ // fixed width 0 is used for fixed delta runs. sequence that require
+ // only 1 bit to encode will have an additional bit
+ if (fb == 1) {
+ fb = 2;
+ }
+ efb = SerializationUtils.encodeBitWidth(fb);
+ efb = efb << 1;
+ len = variableRunLength - 1;
+ variableRunLength = 0;
+ }
+
+ // extract the 9th bit of run length
+ int tailBits = (len & 0x100) >>> 8;
+
+ // create first byte of the header
+ int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ int headerSecondByte = len & 0xff;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+
+ // store the first value from zigzag literal array
+ if (signed) {
+ SerializationUtils.writeVslong(output, literals[0]);
+ } else {
+ SerializationUtils.writeVulong(output, literals[0]);
+ }
+
+ // if delta is fixed then we don't need to store delta blob else store
+ // delta blob using fixed bit packing
+ if (isFixedDelta) {
+ // store the fixed delta using zigzag encoding. deltas can be negative
+ // even if all values are positive
+ SerializationUtils.writeVslong(output, fixedDelta);
+ } else {
+ // bit pack the delta blob
+ SerializationUtils.writeVslong(output, adjDeltas[0]);
+ SerializationUtils.writeInts(adjDeltas, 1, adjDeltas.length - 1, fb, output);
+ }
+ }
+
+ private void writePatchedBaseValues() throws IOException {
+
+ // write the number of fixed bits required in next 5 bits
+ int fb = brBits95p;
+ int efb = SerializationUtils.encodeBitWidth(fb) << 1;
+
+ // adjust variable run length, they are one off
+ variableRunLength -= 1;
+
+ // extract the 9th bit of run length
+ int tailBits = (variableRunLength & 0x100) >>> 8;
+
+ // create first byte of the header
+ int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ int headerSecondByte = variableRunLength & 0xff;
+
+ // find the number of bytes required for base and shift it by 5 bits
+ // to accommodate patch width
+ int baseWidth = SerializationUtils.findClosestNumBits(zzMin);
+ int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1;
+ int bb = (baseBytes - 1) << 5;
+
+ // third byte contains 3 bits for number of bytes occupied by base
+ // and 5 bits for patchWidth
+ int headerThirdByte = bb | SerializationUtils.encodeBitWidth(patchWidth);
+
+ // fourth byte contains 3 bits for page gap width and 5 bits for
+ // patch length
+ int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+ output.write(headerThirdByte);
+ output.write(headerFourthByte);
+
+ // write the base value using fixed bytes in big endian order
+ for (int i = baseBytes - 1; i >= 0; i--) {
+ byte b = (byte) ((zzMin >>> (i * 8)) & 0xff);
+ output.write(b);
+ }
+
+ // bit packing the delta values and write each bytes
+ int closestFixedBits = SerializationUtils.getClosestFixedBits(brBits95p);
+ SerializationUtils.writeInts(baseRedLiterals, 0, baseRedLiterals.length, closestFixedBits,
+ output);
+
+ // write the patch blob
+ closestFixedBits = SerializationUtils.getClosestFixedBits(patchGapWidth
+ + patchWidth);
+ SerializationUtils
+ .writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits, output);
+
+ // reset run length
+ variableRunLength = 0;
+ }
+
+ private int getOpcode() {
+ return encoding.ordinal() << 6;
+ }
+
+ private void writeDirectValues() throws IOException {
+
+ // write the number of fixed bits required in next 5 bits
+ int efb = SerializationUtils.encodeBitWidth(zzBits100p) << 1;
+
+ // adjust variable run length
+ variableRunLength -= 1;
+
+ // extract the 9th bit of run length
+ int tailBits = (variableRunLength & 0x100) >>> 8;
+
+ // create first byte of the header
+ int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of
+ // runlength
+ int headerSecondByte = variableRunLength & 0xff;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+
+ // bit packing the delta values and write each bytes
+ SerializationUtils.writeInts(zigzagLiterals, 0, zigzagLiterals.length, zzBits100p, output);
+
+ // reset run length
+ variableRunLength = 0;
+ }
+
+ private void writeShortRepeatValues() throws IOException {
+ // get the value that is repeating, compute the bits and bytes required
+ long repeatVal = 0;
+ if (signed) {
+ repeatVal = SerializationUtils.zigzagEncode(literals[0]);
+ } else {
+ repeatVal = literals[0];
+ }
+
+ int numBitsRepeatVal = SerializationUtils.findClosestNumBits(repeatVal);
+ int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3
+ : (numBitsRepeatVal >>> 3) + 1;
+
+ // if the runs are long or too short and if the delta is non zero, then
+ // choose a different algorithm
+ if (fixedRunLength >= MIN_REPEAT
+ && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH
+ && numBytesRepeatVal <= MAX_SHORT_REPEAT_SIZE && prevDelta == 0) {
+ // write encoding type in top 2 bits
+ int header = getOpcode();
+
+ // write the number of bytes required for the value
+ header |= ((numBytesRepeatVal - 1) << 3);
+
+ // write the run length
+ fixedRunLength -= MIN_REPEAT;
+ header |= fixedRunLength;
+
+ // write the header
+ output.write(header);
+
+ // write the payload (i.e. the repeat value) in big endian
+ for (int i = numBytesRepeatVal - 1; i >= 0; i--) {
+ int b = (int) ((repeatVal >>> (i * 8)) & 0xff);
+ output.write(b);
+ }
+
+ fixedRunLength = 0;
+ } else {
+ determineEncoding();
+ writeValues();
+ }
+ }
+
+ private void determineEncoding() {
+ // used for direct encoding
+ zigzagLiterals = new long[numLiterals];
+
+ // used for patched base encoding
+ baseRedLiterals = new long[numLiterals];
+
+ // used for delta encoding
+ adjDeltas = new long[numLiterals - 1];
+
+ int idx = 0;
+
+ // for identifying monotonic sequences
+ boolean isIncreasing = false;
+ int increasingCount = 1;
+ boolean isDecreasing = false;
+ int decreasingCount = 1;
+
+ // for identifying type of delta encoding
+ long currMin = literals[0];
+ long currMax = literals[0];
+ isFixedDelta = true;
+ long currDelta = 0;
+
+ if (signed) {
+ zzMin = SerializationUtils.zigzagEncode(literals[0]);
+ } else {
+ zzMin = literals[0];
+ }
+ long deltaMax = 0;
+
+ // populate all variables to identify the encoding type
+ if (numLiterals >= 1) {
+ currDelta = literals[1] - literals[0];
+ for (int i = 0; i < numLiterals; i++) {
+ if (i > 0 && literals[i] >= currMax) {
+ currMax = literals[i];
+ increasingCount++;
+ }
+
+ if (i > 0 && literals[i] <= currMin) {
+ currMin = literals[i];
+ decreasingCount++;
+ }
+
+ // if delta doesn't changes then mark it as fixed delta
+ if (i > 0 && isFixedDelta) {
+ if (literals[i] - literals[i - 1] != currDelta) {
+ isFixedDelta = false;
+ }
+
+ fixedDelta = currDelta;
+ }
+
+ // store the minimum value among zigzag encoded values. The min
+ // value (base) will be removed in patched base encoding
+ long zzEncVal = 0;
+ if (signed) {
+ zzEncVal = SerializationUtils.zigzagEncode(literals[i]);
+ } else {
+ zzEncVal = literals[i];
+ }
+ if (zzEncVal < zzMin) {
+ zzMin = zzEncVal;
+ }
+ zigzagLiterals[idx] = zzEncVal;
+ idx++;
+
+ // max delta value is required for computing the fixed bits
+ // required for delta blob in delta encoding
+ if (i > 0) {
+ if (i == 1) {
+ // first value preserve the sign
+ adjDeltas[i - 1] = literals[i] - literals[i - 1];
+ } else {
+ adjDeltas[i - 1] = Math.abs(literals[i] - literals[i - 1]);
+ if (adjDeltas[i - 1] > deltaMax) {
+ deltaMax = adjDeltas[i - 1];
+ }
+ }
+ }
+ }
+
+ // stores the number of bits required for packing delta blob in
+ // delta encoding
+ bitsDeltaMax = SerializationUtils.findClosestNumBits(deltaMax);
+
+ // if decreasing count equals total number of literals then the
+ // sequence is monotonically decreasing
+ if (increasingCount == 1 && decreasingCount == numLiterals) {
+ isDecreasing = true;
+ }
+
+ // if increasing count equals total number of literals then the
+ // sequence is monotonically increasing
+ if (decreasingCount == 1 && increasingCount == numLiterals) {
+ isIncreasing = true;
+ }
+ }
+
+ // if the sequence is both increasing and decreasing then it is not
+ // monotonic
+ if (isDecreasing && isIncreasing) {
+ isDecreasing = false;
+ isIncreasing = false;
+ }
+
+ // fixed delta condition
+ if (isIncreasing == false && isDecreasing == false && isFixedDelta == true) {
+ encoding = EncodingType.DELTA;
+ return;
+ }
+
+ // monotonic condition
+ if (isIncreasing || isDecreasing) {
+ encoding = EncodingType.DELTA;
+ return;
+ }
+
+ // percentile values are computed for the zigzag encoded values. if the
+ // number of bit requirement between 90th and 100th percentile varies
+ // beyond a threshold then we need to patch the values. if the variation
+ // is not significant then we can use direct or delta encoding
+
+ double p = 0.9;
+ zzBits90p = SerializationUtils.percentileBits(zigzagLiterals, p);
+
+ p = 1.0;
+ zzBits100p = SerializationUtils.percentileBits(zigzagLiterals, p);
+
+ int diffBitsLH = zzBits100p - zzBits90p;
+
+ // if the difference between 90th percentile and 100th percentile fixed
+ // bits is > 1 then we need patch the values
+ if (isIncreasing == false && isDecreasing == false && diffBitsLH > 1
+ && isFixedDelta == false) {
+ // patching is done only on base reduced values.
+ // remove base from literals
+ for (int i = 0; i < zigzagLiterals.length; i++) {
+ baseRedLiterals[i] = literals[i] - currMin;
+ }
+
+ // 95th percentile width is used to determine max allowed value
+ // after which patching will be done
+ p = 0.95;
+ brBits95p = SerializationUtils.percentileBits(baseRedLiterals, p);
+
+ // 100th percentile is used to compute the max patch width
+ p = 1.0;
+ brBits100p = SerializationUtils.percentileBits(baseRedLiterals, p);
+
+ // after base reducing the values, if the difference in bits between
+ // 95th percentile and 100th percentile value is zero then there
+ // is no point in patching the values, in which case we will
+ // fallback to DIRECT encoding.
+ // The decision to use patched base was based on zigzag values, but the
+ // actual patching is done on base reduced literals.
+ if ((brBits100p - brBits95p) != 0) {
+ encoding = EncodingType.PATCHED_BASE;
+ preparePatchedBlob();
+ return;
+ } else {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+ }
+
+ // if difference in bits between 95th percentile and 100th percentile is
+ // 0, then patch length will become 0. Hence we will fallback to direct
+ if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1
+ && isFixedDelta == false) {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+
+ // this should not happen
+ if (encoding == null) {
+ throw new RuntimeException("Integer encoding cannot be determined.");
+ }
+ }
+
+ private void preparePatchedBlob() {
+ // mask will be max value beyond which patch will be generated
+ int mask = (1 << brBits95p) - 1;
+
+ // since we are considering only 95 percentile, the size of gap and
+ // patch array can contain only be 5% values
+ patchLength = (int) Math.ceil((baseRedLiterals.length * 0.05));
+ int[] gapList = new int[patchLength];
+ long[] patchList = new long[patchLength];
+
+ // #bit for patch
+ patchWidth = brBits100p - brBits95p;
+ patchWidth = SerializationUtils.getClosestFixedBits(patchWidth);
+
+ int gapIdx = 0;
+ int patchIdx = 0;
+ int prev = 0;
+ int gap = 0;
+ int maxGap = 0;
+
+ for (int i = 0; i < baseRedLiterals.length; i++) {
+ // if value is above mask then create the patch and record the gap
+ if (baseRedLiterals[i] > mask) {
+ gap = i - prev;
+ if (gap > maxGap) {
+ maxGap = gap;
+ }
+
+ // gaps are relative, so store the previous patched value
+ prev = i;
+ gapList[gapIdx++] = gap;
+
+ // extract the most significant bits that are over mask
+ long patch = baseRedLiterals[i] >>> brBits95p;
+ patchList[patchIdx++] = patch;
+
+ // strip off the MSB to enable safe bit packing
+ baseRedLiterals[i] &= mask;
+ }
+ }
+
+ // adjust the patch length to number of entries in gap list
+ patchLength = gapIdx;
+
+ // if the element to be patched is the first and only element then
+ // max gap will be 0, but to store the gap as 0 we need atleast 1 bit
+ if (maxGap == 0 && patchLength != 0) {
+ patchGapWidth = 1;
+ } else {
+ patchGapWidth = SerializationUtils.findClosestNumBits(maxGap);
+ }
+
+ // special case: if the patch gap width is greater than 256, then
+ // we need 9 bits to encode the gap width. But we only have 3 bits in
+ // header to record the gap width. To deal with this case, we will save
+ // two entries in final patch list with following entries
+ // 256 gap width => 0 for patch value
+ // actual gap - 256 => actual patch value
+ if (patchGapWidth > 8) {
+ patchGapWidth = 8;
+ // for gap = 511, we need two additional entries in patch list
+ if (maxGap == 511) {
+ patchLength += 2;
+ } else {
+ patchLength += 1;
+ }
+ }
+
+ // create gap vs patch list
+ gapIdx = 0;
+ patchIdx = 0;
+ gapVsPatchList = new long[patchLength];
+ for (int i = 0; i < patchLength; i++) {
+ long g = gapList[gapIdx++];
+ long p = patchList[patchIdx++];
+ while (g > 255) {
+ gapVsPatchList[i++] = (255 << patchWidth) | 0;
+ g -= 255;
+ }
+
+ // store patch value in LSBs and gap in MSBs
+ gapVsPatchList[i] = (g << patchWidth) | p;
+ }
+ }
+
+ /**
+ * clears all the variables
+ */
+ private void clear() {
+ numLiterals = 0;
+ encoding = null;
+ prevDelta = 0;
+ zigzagLiterals = null;
+ baseRedLiterals = null;
+ adjDeltas = null;
+ fixedDelta = 0;
+ zzBits90p = 0;
+ zzBits100p = 0;
+ brBits95p = 0;
+ brBits100p = 0;
+ bitsDeltaMax = 0;
+ patchGapWidth = 0;
+ patchLength = 0;
+ patchWidth = 0;
+ gapVsPatchList = null;
+ zzMin = 0;
+ isFixedDelta = false;
+ }
+
+ public void flush() throws IOException {
+ // if only one element is left in buffer then use short repeat encoding
+ if (numLiterals == 1) {
+ encoding = EncodingType.SHORT_REPEAT;
+ fixedRunLength = 1;
+ writeValues();
+ }
+
+ // if variable runs are not 0 then determine the delta encoding to use
+ // and flush out the buffer
+ if (variableRunLength != 0 && numLiterals != 0) {
+ determineEncoding();
+ writeValues();
+ }
+
+ // if fixed runs are not 0 then flush out the buffer
+ if (fixedRunLength != 0 && numLiterals != 0) {
+ if (fixedRunLength < MIN_REPEAT) {
+ variableRunLength = fixedRunLength;
+ fixedRunLength = 0;
+ determineEncoding();
+ writeValues();
+ } else {
+ encoding = EncodingType.SHORT_REPEAT;
+ writeValues();
+ }
+ }
+
+ output.flush();
+ }
+
+ public void write(long val) throws IOException {
+ if (numLiterals == 0) {
+ initializeLiterals(val);
+ } else {
+ if (numLiterals == 1) {
+ prevDelta = val - literals[0];
+ literals[numLiterals++] = val;
+ // if both values are same count as fixed run else variable run
+ if (val == literals[0]) {
+ fixedRunLength = 2;
+ variableRunLength = 0;
+ } else {
+ fixedRunLength = 0;
+ variableRunLength = 2;
+ }
+ } else {
+ long currentDelta = val - literals[numLiterals - 1];
+ if (prevDelta == 0 && currentDelta == 0) {
+ // fixed delta run
+
+ literals[numLiterals++] = val;
+
+ // if variable run is non-zero then we are seeing repeating
+ // values at the end of variable run in which case keep
+ // updating variable and fixed runs
+ if (variableRunLength > 0) {
+ fixedRunLength = 2;
+ }
+ fixedRunLength += 1;
+
+ // if fixed run met the minimum condition and if variable
+ // run is non-zero then flush the variable run and shift the
+ // tail fixed runs to start of the buffer
+ if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) {
+ numLiterals -= MIN_REPEAT;
+ variableRunLength -= MIN_REPEAT - 1;
+ // copy the tail fixed runs
+ long[] tailVals = new long[MIN_REPEAT];
+ System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT);
+
+ // determine variable encoding and flush values
+ determineEncoding();
+ writeValues();
+
+ // shift tail fixed runs to beginning of the buffer
+ for (long l : tailVals) {
+ literals[numLiterals++] = l;
+ }
+ }
+
+ // if fixed runs reached max repeat length then write values
+ if (fixedRunLength == MAX_SCOPE) {
+ determineEncoding();
+ writeValues();
+ }
+ } else {
+ // variable delta run
+
+ // if fixed run length is non-zero and if satisfies the
+ // short repeat algorithm conditions then write the values
+ // as short repeats else determine the appropriate algorithm
+ // to persist the values
+ if (fixedRunLength >= MIN_REPEAT) {
+ if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+ encoding = EncodingType.SHORT_REPEAT;
+ writeValues();
+ } else {
+ determineEncoding();
+ writeValues();
+ }
+ }
+
+ // if fixed run length is 0 && fixedRunLength < MIN_REPEAT) {
+ if (val != literals[numLiterals - 1]) {
+ variableRunLength = fixedRunLength;
+ fixedRunLength = 0;
+ }
+ }
+
+ // after writing values re-initialize the variables
+ if (numLiterals == 0) {
+ initializeLiterals(val);
+ } else {
+ // keep updating variable run lengths
+ prevDelta = val - literals[numLiterals - 1];
+ literals[numLiterals++] = val;
+ variableRunLength += 1;
+
+ // if variable run length reach the max scope, write it
+ if (variableRunLength == MAX_SCOPE) {
+ determineEncoding();
+ writeValues();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void initializeLiterals(long val) {
+ literals[numLiterals++] = val;
+ fixedRunLength = 1;
+ variableRunLength = 1;
+ }
+
+ public void getPosition(PositionRecorder recorder) throws IOException {
+ output.getPosition(recorder);
+ recorder.addPosition(numLiterals);
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
index 67762b5..5dc8263 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
@@ -26,8 +26,16 @@
final class SerializationUtils {
+ enum FixedBitSizes {
+ ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE,
+ THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN, TWENTY,
+ TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX, TWENTYEIGHT, THIRTY,
+ THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR;
+ }
+
// unused
- private SerializationUtils() {}
+ private SerializationUtils() {
+ }
static void writeVulong(OutputStream output, long value) throws IOException {
while (true) {
@@ -42,7 +50,7 @@ static void writeVulong(OutputStream output, long value) throws IOException {
}
static void writeVslong(OutputStream output, long value) throws IOException {
- writeVulong(output, (value << 1) ^ (value >> 63));
+ writeVulong(output, zigzagEncode(value));
}
@@ -63,12 +71,12 @@ static long readVulong(InputStream in) throws IOException {
static long readVslong(InputStream in) throws IOException {
long result = readVulong(in);
- return (result >>> 1) ^ -(result & 1);
+ return zigzagDecode(result);
}
static float readFloat(InputStream in) throws IOException {
int ser = in.read() | (in.read() << 8) | (in.read() << 16) |
- (in.read() << 24);
+ (in.read() << 24);
return Float.intBitsToFloat(ser);
}
@@ -81,19 +89,19 @@ static void writeFloat(OutputStream output, float value) throws IOException {
}
static double readDouble(InputStream in) throws IOException {
- long ser = (long) in.read() |
- ((long) in.read() << 8) |
- ((long) in.read() << 16) |
- ((long) in.read() << 24) |
- ((long) in.read() << 32) |
- ((long) in.read() << 40) |
- ((long) in.read() << 48) |
- ((long) in.read() << 56);
+ long ser = (long) in.read() |
+ ((long) in.read() << 8) |
+ ((long) in.read() << 16) |
+ ((long) in.read() << 24) |
+ ((long) in.read() << 32) |
+ ((long) in.read() << 40) |
+ ((long) in.read() << 48) |
+ ((long) in.read() << 56);
return Double.longBitsToDouble(ser);
}
static void writeDouble(OutputStream output,
- double value) throws IOException {
+ double value) throws IOException {
long ser = Double.doubleToLongBits(value);
output.write(((int) ser) & 0xff);
output.write(((int) (ser >> 8)) & 0xff);
@@ -107,19 +115,22 @@ static void writeDouble(OutputStream output,
/**
* Write the arbitrarily sized signed BigInteger in vint format.
- *
+ *
* Signed integers are encoded using the low bit as the sign bit using zigzag
* encoding.
- *
+ *
* Each byte uses the low 7 bits for data and the high bit for stop/continue.
- *
+ *
* Bytes are stored LSB first.
- * @param output the stream to write to
- * @param value the value to output
+ *
+ * @param output
+ * the stream to write to
+ * @param value
+ * the value to output
* @throws IOException
*/
static void writeBigInteger(OutputStream output,
- BigInteger value) throws IOException {
+ BigInteger value) throws IOException {
// encode the signed number as a positive integer
value = value.shiftLeft(1);
int sign = value.signum();
@@ -132,7 +143,7 @@ static void writeBigInteger(OutputStream output,
long lowBits = value.longValue() & 0x7fffffffffffffffL;
length -= 63;
// write out the next 63 bits worth of data
- for(int i=0; i < 9; ++i) {
+ for (int i = 0; i < 9; ++i) {
// if this is the last byte, leave the high bit off
if (length <= 0 && (lowBits & ~0x7f) == 0) {
output.write((byte) lowBits);
@@ -148,7 +159,9 @@ static void writeBigInteger(OutputStream output,
/**
* Read the signed arbitrary sized BigInteger BigInteger in vint format
- * @param input the stream to read from
+ *
+ * @param input
+ * the stream to read from
* @return the read BigInteger
* @throws IOException
*/
@@ -169,12 +182,12 @@ static BigInteger readBigInteger(InputStream input) throws IOException {
result = BigInteger.valueOf(work);
work = 0;
} else if (offset % 63 == 0) {
- result = result.or(BigInteger.valueOf(work).shiftLeft(offset-63));
+ result = result.or(BigInteger.valueOf(work).shiftLeft(offset - 63));
work = 0;
}
} while (b >= 0x80);
if (work != 0) {
- result = result.or(BigInteger.valueOf(work).shiftLeft((offset/63)*63));
+ result = result.or(BigInteger.valueOf(work).shiftLeft((offset / 63) * 63));
}
// convert back to a signed number
boolean isNegative = result.testBit(0);
@@ -185,4 +198,299 @@ static BigInteger readBigInteger(InputStream input) throws IOException {
result = result.shiftRight(1);
return result;
}
+
+
+ /**
+ * Count the number of bits required to encode the given value
+ *
+ * @param value
+ * @return bits required to store value
+ */
+ static int findClosestNumBits(long value) {
+ int count = 0;
+ while (value > 0) {
+ count++;
+ value = value >>> 1;
+ }
+ return getClosestFixedBits(count);
+ }
+
+ /**
+ * zigzag encode the given value
+ *
+ * @param val
+ * @return zigzag encoded value
+ */
+ static long zigzagEncode(long val) {
+ return (val << 1) ^ (val >> 63);
+ }
+
+ /**
+ * zigzag decode the given value
+ *
+ * @param val
+ * @return zizag decoded value
+ */
+ static long zigzagDecode(long val) {
+ return (val >>> 1) ^ -(val & 1);
+ }
+
+ /**
+ * Compute the bits required to represent pth percentile value
+ *
+ * @param data
+ * - array
+ * @param p
+ * - percentile value (>=0.0 to <=1.0)
+ * @return pth percentile bits
+ */
+ static int percentileBits(long[] data, double p) {
+ if ((p > 1.0) || (p <= 0.0)) {
+ return -1;
+ }
+
+ // histogram that store the encoded bit requirement for each values.
+ // maximum number of bits that can encoded is 32 (refer FixedBitSizes)
+ int[] hist = new int[32];
+
+ // compute the histogram
+ for (long l : data) {
+ int idx = encodeBitWidth(findClosestNumBits(l));
+ hist[idx] += 1;
+ }
+
+ int len = data.length;
+ int perLen = (int) (len * (1.0 - p));
+
+ // return the bits required by pth percentile length
+ for (int i = hist.length - 1; i >= 0; i--) {
+ perLen -= hist[i];
+ if (perLen < 0) {
+ return decodeBitWidth(i);
+ }
+ }
+
+ return 0;
+ }
+
+ /**
+ * Read n bytes in big endian order and convert to long
+ *
+ * @param b
+ * - byte array
+ * @return long value
+ */
+ static long bytesToLongBE(InStream input, int n) throws IOException {
+ long out = 0;
+ long val = 0;
+ while (n > 0) {
+ n--;
+ // store it in a long and then shift else integer overflow will occur
+ val = input.read();
+ out |= (val << (n * 8));
+ }
+ return out;
+ }
+
+ /**
+ * Calculate the number of bytes required
+ *
+ * @param n
+ * - number of values
+ * @param numBits
+ * - bit width
+ * @return number of bytes required
+ */
+ static int getTotalBytesRequired(int n, int numBits) {
+ return (n * numBits + 7) / 8;
+ }
+
+ /**
+ * For a given fixed bit this function will return the closest available fixed
+ * bit
+ *
+ * @param n
+ * @return closest valid fixed bit
+ */
+ static int getClosestFixedBits(int n) {
+ if (n == 0) {
+ return 1;
+ }
+
+ if (n >= 1 && n <= 24) {
+ return n;
+ } else if (n > 24 && n <= 26) {
+ return 26;
+ } else if (n > 26 && n <= 28) {
+ return 28;
+ } else if (n > 28 && n <= 30) {
+ return 30;
+ } else if (n > 30 && n <= 32) {
+ return 32;
+ } else if (n > 32 && n <= 40) {
+ return 40;
+ } else if (n > 40 && n <= 48) {
+ return 48;
+ } else if (n > 48 && n <= 56) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
+ /**
+ * Finds the closest available fixed bit width match and returns its encoded
+ * value (ordinal)
+ *
+ * @param n
+ * - fixed bit width to encode
+ * @return encoded fixed bit width
+ */
+ static int encodeBitWidth(int n) {
+ n = getClosestFixedBits(n);
+
+ if (n >= 1 && n <= 24) {
+ return n - 1;
+ } else if (n > 24 && n <= 26) {
+ return FixedBitSizes.TWENTYSIX.ordinal();
+ } else if (n > 26 && n <= 28) {
+ return FixedBitSizes.TWENTYEIGHT.ordinal();
+ } else if (n > 28 && n <= 30) {
+ return FixedBitSizes.THIRTY.ordinal();
+ } else if (n > 30 && n <= 32) {
+ return FixedBitSizes.THIRTYTWO.ordinal();
+ } else if (n > 32 && n <= 40) {
+ return FixedBitSizes.FORTY.ordinal();
+ } else if (n > 40 && n <= 48) {
+ return FixedBitSizes.FORTYEIGHT.ordinal();
+ } else if (n > 48 && n <= 56) {
+ return FixedBitSizes.FIFTYSIX.ordinal();
+ } else {
+ return FixedBitSizes.SIXTYFOUR.ordinal();
+ }
+ }
+
+ /**
+ * Decodes the ordinal fixed bit value to actual fixed bit width value
+ *
+ * @param n
+ * - encoded fixed bit width
+ * @return decoded fixed bit width
+ */
+ static int decodeBitWidth(int n) {
+ if (n >= FixedBitSizes.ONE.ordinal() && n <= FixedBitSizes.TWENTYFOUR.ordinal()) {
+ return n + 1;
+ } else if (n == FixedBitSizes.TWENTYSIX.ordinal()) {
+ return 26;
+ } else if (n == FixedBitSizes.TWENTYEIGHT.ordinal()) {
+ return 28;
+ } else if (n == FixedBitSizes.THIRTY.ordinal()) {
+ return 30;
+ } else if (n == FixedBitSizes.THIRTYTWO.ordinal()) {
+ return 32;
+ } else if (n == FixedBitSizes.FORTY.ordinal()) {
+ return 40;
+ } else if (n == FixedBitSizes.FORTYEIGHT.ordinal()) {
+ return 48;
+ } else if (n == FixedBitSizes.FIFTYSIX.ordinal()) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
+ /**
+ * Bitpack and write the input values to underlying output stream
+ *
+ * @param input
+ * - values to write
+ * @param offset
+ * - offset
+ * @param len
+ * - length
+ * @param bitSize
+ * - bit width
+ * @param output
+ * - output stream
+ * @throws IOException
+ */
+ static void writeInts(long[] input, int offset, int len, int bitSize, OutputStream output)
+ throws IOException {
+ if (input == null || input.length < 1 || offset < 0 || len < 1 || bitSize < 1) {
+ return;
+ }
+
+ int bitsLeft = 8;
+ byte current = 0;
+ for (int i = offset; i < (offset + len); i++) {
+ long value = input[i];
+ int bitsToWrite = bitSize;
+ while (bitsToWrite > bitsLeft) {
+ // add the bits to the bottom of the current word
+ current |= value >>> (bitsToWrite - bitsLeft);
+ // subtract out the bits we just added
+ bitsToWrite -= bitsLeft;
+ // zero out the bits above bitsToWrite
+ value &= (1L << bitsToWrite) - 1;
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ bitsLeft -= bitsToWrite;
+ current |= value << bitsLeft;
+ if (bitsLeft == 0) {
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ }
+
+ // flush
+ if (bitsLeft != 8) {
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ }
+
+ /**
+ * Read bitpacked integers from input stream
+ *
+ * @param buffer
+ * - input buffer
+ * @param offset
+ * - offset
+ * @param len
+ * - length
+ * @param bitSize
+ * - bit width
+ * @param input
+ * - input stream
+ * @throws IOException
+ */
+ static void readInts(long[] buffer, int offset, int len, int bitSize, InStream input)
+ throws IOException {
+ int bitsLeft = 0;
+ int current = 0;
+
+ for (int i = offset; i < (offset + len); i++) {
+ long result = 0;
+ int bitsLeftToRead = bitSize;
+ while (bitsLeftToRead > bitsLeft) {
+ result <<= bitsLeft;
+ result |= current & ((1 << bitsLeft) - 1);
+ bitsLeftToRead -= bitsLeft;
+ current = input.read();
+ bitsLeft = 8;
+ }
+
+ // handle the left over bits
+ if (bitsLeftToRead > 0) {
+ result <<= bitsLeftToRead;
+ bitsLeft -= bitsLeftToRead;
+ result |= (current >> bitsLeft) & ((1 << bitsLeftToRead) - 1);
+ }
+ buffer[i] = result;
+ }
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 52defb9..64d9a9b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -364,6 +364,7 @@ public boolean isCompressed() {
private final PositionedOutputStream rowIndexStream;
private boolean foundNulls;
private OutStream isPresentOutStream;
+ protected final boolean useDirectV2Encoding;
/**
* Create a tree writer.
@@ -379,6 +380,9 @@ public boolean isCompressed() {
this.isCompressed = streamFactory.isCompressed();
this.id = columnId;
this.inspector = inspector;
+ // make direct v2 encoding as default
+ // FIXME: do we need to make this user configurable?
+ this.useDirectV2Encoding = true;
if (nullable) {
isPresentOutStream = streamFactory.createStream(id,
OrcProto.Stream.Kind.PRESENT);
@@ -493,6 +497,10 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
* @return the information about the encoding of this column
*/
OrcProto.ColumnEncoding getEncoding() {
+ if (useDirectV2Encoding) {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
return OrcProto.ColumnEncoding.newBuilder().setKind(
OrcProto.ColumnEncoding.Kind.DIRECT).build();
}
@@ -618,7 +626,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
}
private static class IntegerTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter writer;
+ private final IntegerWriter writer;
private final ShortObjectInspector shortInspector;
private final IntObjectInspector intInspector;
private final LongObjectInspector longInspector;
@@ -630,7 +638,11 @@ void recordPosition(PositionRecorder recorder) throws IOException {
super(columnId, inspector, writer, nullable);
PositionedOutputStream out = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
- this.writer = new RunLengthIntegerWriter(out, true);
+ if (super.useDirectV2Encoding) {
+ this.writer = new RunLengthIntegerWriterV2(out, true);
+ } else {
+ this.writer = new RunLengthIntegerWriter(out, true);
+ }
if (inspector instanceof IntObjectInspector) {
intInspector = (IntObjectInspector) inspector;
shortInspector = null;
@@ -759,8 +771,8 @@ void recordPosition(PositionRecorder recorder) throws IOException {
private static class StringTreeWriter extends TreeWriter {
private static final int INITIAL_DICTIONARY_SIZE = 4096;
private final PositionedOutputStream stringOutput;
- private final RunLengthIntegerWriter lengthOutput;
- private final RunLengthIntegerWriter rowOutput;
+ private final IntegerWriter lengthOutput;
+ private final IntegerWriter rowOutput;
private final StringRedBlackTree dictionary =
new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
private final DynamicIntArray rows = new DynamicIntArray();
@@ -776,10 +788,17 @@ void recordPosition(PositionRecorder recorder) throws IOException {
super(columnId, inspector, writer, nullable);
stringOutput = writer.createStream(id,
OrcProto.Stream.Kind.DICTIONARY_DATA);
- lengthOutput = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false);
- rowOutput = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), false);
+ if (super.useDirectV2Encoding == true) {
+ lengthOutput = new RunLengthIntegerWriterV2(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ rowOutput = new RunLengthIntegerWriterV2(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), false);
+ } else {
+ lengthOutput = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ rowOutput = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), false);
+ }
recordPosition(rowIndexPosition);
rowIndexValueCount.add(0L);
buildIndex = writer.buildIndex();
@@ -881,7 +900,7 @@ long estimateMemory() {
private static class BinaryTreeWriter extends TreeWriter {
private final PositionedOutputStream stream;
- private final RunLengthIntegerWriter length;
+ private final IntegerWriter length;
BinaryTreeWriter(int columnId,
ObjectInspector inspector,
@@ -890,8 +909,13 @@ long estimateMemory() {
super(columnId, inspector, writer, nullable);
this.stream = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
- this.length = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false);
+ if (super.useDirectV2Encoding) {
+ this.length = new RunLengthIntegerWriterV2(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ } else {
+ this.length = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ }
recordPosition(rowIndexPosition);
}
@@ -928,18 +952,25 @@ void recordPosition(PositionRecorder recorder) throws IOException {
Timestamp.valueOf("2015-01-01 00:00:00").getTime() / MILLIS_PER_SECOND;
private static class TimestampTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter seconds;
- private final RunLengthIntegerWriter nanos;
+ private final IntegerWriter seconds;
+ private final IntegerWriter nanos;
TimestampTreeWriter(int columnId,
ObjectInspector inspector,
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
- this.seconds = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), true);
- this.nanos = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), false);
+ if (super.useDirectV2Encoding) {
+ this.seconds = new RunLengthIntegerWriterV2(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), true);
+ this.nanos = new RunLengthIntegerWriterV2(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), false);
+ } else {
+ this.seconds = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), true);
+ this.nanos = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), false);
+ }
recordPosition(rowIndexPosition);
}
@@ -990,7 +1021,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
private static class DecimalTreeWriter extends TreeWriter {
private final PositionedOutputStream valueStream;
- private final RunLengthIntegerWriter scaleStream;
+ private final IntegerWriter scaleStream;
DecimalTreeWriter(int columnId,
ObjectInspector inspector,
@@ -998,8 +1029,13 @@ void recordPosition(PositionRecorder recorder) throws IOException {
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
- scaleStream = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), true);
+ if (super.useDirectV2Encoding) {
+ this.scaleStream = new RunLengthIntegerWriterV2(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), true);
+ } else {
+ this.scaleStream = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), true);
+ }
recordPosition(rowIndexPosition);
}
@@ -1076,7 +1112,7 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
}
private static class ListTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter lengths;
+ private final IntegerWriter lengths;
ListTreeWriter(int columnId,
ObjectInspector inspector,
@@ -1088,9 +1124,13 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
childrenWriters[0] =
createTreeWriter(listObjectInspector.getListElementObjectInspector(),
writer, true);
- lengths =
- new RunLengthIntegerWriter(writer.createStream(columnId,
+ if (super.useDirectV2Encoding) {
+ lengths = new RunLengthIntegerWriterV2(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false);
+ } else {
+ lengths = new RunLengthIntegerWriter(writer.createStream(columnId,
OrcProto.Stream.Kind.LENGTH), false);
+ }
recordPosition(rowIndexPosition);
}
@@ -1126,7 +1166,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
}
private static class MapTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter lengths;
+ private final IntegerWriter lengths;
MapTreeWriter(int columnId,
ObjectInspector inspector,
@@ -1139,9 +1179,15 @@ void recordPosition(PositionRecorder recorder) throws IOException {
createTreeWriter(insp.getMapKeyObjectInspector(), writer, true);
childrenWriters[1] =
createTreeWriter(insp.getMapValueObjectInspector(), writer, true);
- lengths =
- new RunLengthIntegerWriter(writer.createStream(columnId,
- OrcProto.Stream.Kind.LENGTH), false);
+ if (super.useDirectV2Encoding) {
+ lengths =
+ new RunLengthIntegerWriterV2(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false);
+ } else {
+ lengths =
+ new RunLengthIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false);
+ }
recordPosition(rowIndexPosition);
}
diff --git ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
index d19bc06..994be70 100644
--- ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
+++ ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
@@ -66,6 +66,7 @@ message ColumnEncoding {
enum Kind {
DIRECT = 0;
DICTIONARY = 1;
+ DIRECT_V2 = 2;
}
required Kind kind = 1;
optional uint32 dictionarySize = 2;
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java
index 4e2c59f..1ad8661 100644
--- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java
@@ -128,7 +128,7 @@ public void testSkips() throws Exception {
BitFieldReader in = new BitFieldReader(InStream.create
("test", inBuf, null, 100), 1);
for(int i=0; i < COUNT; i += 5) {
- int x = (int) in.next();
+ int x = in.next();
if (i < COUNT/2) {
assertEquals(i & 1, x);
} else {
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
new file mode 100644
index 0000000..39ab9df
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Random;
+
+import org.junit.Test;
+
+import com.google.common.primitives.Longs;
+
+public class TestBitPack {
+
+ private static final int SIZE = 100;
+ private static Random rand = new Random(100);
+
+ private long[] deltaEncode(long[] inp) {
+ long[] output = new long[inp.length];
+ for (int i = 0; i < inp.length; i++) {
+ output[i] = SerializationUtils.zigzagEncode(inp[i]);
+ }
+ return output;
+ }
+
+ private long nextLong(Random rng, long n) {
+ long bits, val;
+ do {
+ bits = (rng.nextLong() << 1) >>> 1;
+ val = bits % n;
+ } while (bits - val + (n - 1) < 0L);
+ return val;
+ }
+
+ private void runTest(int numBits) throws IOException {
+ long[] inp = new long[SIZE];
+ for (int i = 0; i < SIZE; i++) {
+ long val = 0;
+ if (numBits <= 32) {
+ if(numBits == 1) {
+ val = -1 * rand.nextInt(2);
+ } else {
+ val = rand.nextInt((int) Math.pow(2, numBits - 1));
+ }
+ } else {
+ val = nextLong(rand, (long) Math.pow(2, numBits - 2));
+ }
+ if (val % 2 == 0) {
+ val = -val;
+ }
+ inp[i] = val;
+ }
+ long[] deltaEncoded = deltaEncode(inp);
+ long minInput = Collections.min(Longs.asList(deltaEncoded));
+ long maxInput = Collections.max(Longs.asList(deltaEncoded));
+ long rangeInput = maxInput - minInput;
+ int fixedWidth = SerializationUtils.findClosestNumBits(rangeInput);
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ OutStream output = new OutStream("test", SIZE, null, collect);
+ SerializationUtils.writeInts(deltaEncoded, 0, deltaEncoded.length, fixedWidth, output);
+ output.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ long[] buff = new long[SIZE];
+ SerializationUtils.readInts(buff, 0, SIZE, fixedWidth, InStream.create
+ ("test", inBuf, null, SIZE));
+ for(int i=0; i= 0; --i) {
+ in.seek(positions[i]);
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i/4, x);
+ } else if (i < 2048) {
+ assertEquals(2*i, x);
+ } else {
+ assertEquals(junk[i-2048], x);
+ }
+ }
+ }
+
+ @Test
+ public void testUncompressedSeek() throws Exception {
+ runSeekTest(null);
+ }
+
+ @Test
+ public void testCompressedSeek() throws Exception {
+ runSeekTest(new ZlibCodec());
+ }
+
+ @Test
+ public void testSkips() throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2(
+ new OutStream("test", 100, null, collect), true);
+ for(int i=0; i < 2048; ++i) {
+ if (i < 1024) {
+ out.write(i);
+ } else {
+ out.write(256 * i);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create
+ ("test", inBuf, null, 100), true);
+ for(int i=0; i < 2048; i += 10) {
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i, x);
+ } else {
+ assertEquals(256 * i, x);
+ }
+ if (i < 2038) {
+ in.skip(9);
+ }
+ in.skip(0);
+ }
+ }
+}
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
new file mode 100644
index 0000000..f7ea0c6
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
@@ -0,0 +1,650 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+
+public class TestNewIntegerEncoding {
+
+ public static class Row {
+ Integer int1;
+ Long long1;
+
+ public Row(int val, long l) {
+ this.int1 = val;
+ this.long1 = l;
+ }
+ }
+
+ public List fetchData(String path) throws IOException {
+ List input = new ArrayList();
+ FileInputStream stream = new FileInputStream(new File(path));
+ try {
+ FileChannel fc = stream.getChannel();
+ MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size());
+ /* Instead of using default, pass in a decoder. */
+ String[] lines = Charset.defaultCharset().decode(bb).toString()
+ .split("\n");
+ for (String line : lines) {
+ long val = 0;
+ try {
+ val = Long.parseLong(line);
+ } catch (NumberFormatException e) {
+ // for now lets ignore (assign 0)
+ }
+ input.add(val);
+ }
+ } finally {
+ stream.close();
+ }
+ return input;
+ }
+
+ Path workDir = new Path(System.getProperty("test.tmp.dir", "target"
+ + File.separator + "test" + File.separator + "tmp"));
+
+ Configuration conf;
+ FileSystem fs;
+ Path testFilePath;
+ String resDir = "ql/src/test/resources";
+
+ @Rule
+ public TestName testCaseName = new TestName();
+
+ @Before
+ public void openFileSystem() throws Exception {
+ conf = new Configuration();
+ fs = FileSystem.getLocal(conf);
+ testFilePath = new Path(workDir, "TestOrcFile."
+ + testCaseName.getMethodName() + ".orc");
+ fs.delete(testFilePath, false);
+ }
+
+ @Test
+ public void testBasicRow() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Row.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ writer.addRow(new Row(111, 1111L));
+ writer.addRow(new Row(111, 1111L));
+ writer.addRow(new Row(111, 1111L));
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(new IntWritable(111), ((OrcStruct) row).getFieldValue(0));
+ assertEquals(new LongWritable(1111), ((OrcStruct) row).getFieldValue(1));
+ }
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5, 6,
+ 7, 8, 9, 10, 1, 1, 1, 1, 1, 1, 10, 9, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1,
+ 2, 5, 1, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1,
+ 9, 2, 6, 3, 7, 1, 9, 2, 6, 2000, 2, 1, 1, 1, 1, 1, 3, 7, 1, 9, 2, 6, 1,
+ 1, 1, 1, 1};
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testBasicDelta1() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] {-500, -400, -350, -325, -310};
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testBasicDelta2() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] {-500, -600, -650, -675, -710};
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testBasicDelta3() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] {500, 400, 350, 325, 310};
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testBasicDelta4() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] {500, 600, 650, 675, 710};
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testIntegerMin() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add((long) Integer.MIN_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.ZLIB, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testIntegerMax() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add((long) Integer.MAX_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testLongMin() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add(Long.MIN_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testLongMax() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add(Long.MAX_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testRandomInt() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 100000; i++) {
+ input.add((long) rand.nextInt());
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testRandomLong() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 100000; i++) {
+ input.add(rand.nextLong());
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt0() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(0, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt1() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(1, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt255() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(255, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.ZLIB, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt256() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(256, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.ZLIB, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBase510() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(510, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.ZLIB, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBase511() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(511, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.ZLIB, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testSeek() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 100000; i++) {
+ input.add((long) rand.nextInt());
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 55555;
+ rows.seekToRow(idx);
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+}
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
index 4153a61..2f0dc17 100644
--- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
@@ -18,6 +18,21 @@
package org.apache.hadoop.hive.ql.io.orc;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -51,20 +66,6 @@
import org.junit.Test;
import org.junit.rules.TestName;
-import java.io.File;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import static junit.framework.Assert.*;
-import static junit.framework.Assert.assertEquals;
-
/**
* Tests for the top level reader/streamFactory of ORC files.
*/
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java
index 9f989fd..2f7a7f1 100644
--- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.hive.ql.io.orc;
import static junit.framework.Assert.assertEquals;
diff --git ql/src/test/resources/orc-file-dump.out ql/src/test/resources/orc-file-dump.out
index 8b88931..fd5e617 100644
--- ql/src/test/resources/orc-file-dump.out
+++ ql/src/test/resources/orc-file-dump.out
@@ -11,73 +11,73 @@ Statistics:
Column 3: count: 21000 min: Darkness, max: worst
Stripes:
- Stripe: offset: 3 data: 69605 rows: 5000 tail: 72 index: 119
+ Stripe: offset: 3 data: 52886 rows: 4143 tail: 72 index: 119
Stream: column 0 section ROW_INDEX start: 3 length 10
Stream: column 1 section ROW_INDEX start: 13 length 35
Stream: column 2 section ROW_INDEX start: 48 length 39
Stream: column 3 section ROW_INDEX start: 87 length 35
- Stream: column 1 section DATA start: 122 length 22605
- Stream: column 2 section DATA start: 22727 length 43426
- Stream: column 3 section DATA start: 66153 length 3403
- Stream: column 3 section LENGTH start: 69556 length 38
- Stream: column 3 section DICTIONARY_DATA start: 69594 length 133
- Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
+ Stream: column 1 section DATA start: 122 length 16596
+ Stream: column 2 section DATA start: 16718 length 33174
+ Stream: column 3 section DATA start: 49892 length 2958
+ Stream: column 3 section LENGTH start: 52850 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 52875 length 133
+ Encoding column 0: DIRECT_V2
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
Encoding column 3: DICTIONARY[35]
- Stripe: offset: 69799 data: 69584 rows: 5000 tail: 73 index: 118
- Stream: column 0 section ROW_INDEX start: 69799 length 10
- Stream: column 1 section ROW_INDEX start: 69809 length 34
- Stream: column 2 section ROW_INDEX start: 69843 length 39
- Stream: column 3 section ROW_INDEX start: 69882 length 35
- Stream: column 1 section DATA start: 69917 length 22597
- Stream: column 2 section DATA start: 92514 length 43439
- Stream: column 3 section DATA start: 135953 length 3377
- Stream: column 3 section LENGTH start: 139330 length 38
- Stream: column 3 section DICTIONARY_DATA start: 139368 length 133
- Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
+ Stripe: offset: 53080 data: 63748 rows: 5000 tail: 73 index: 120
+ Stream: column 0 section ROW_INDEX start: 53080 length 10
+ Stream: column 1 section ROW_INDEX start: 53090 length 36
+ Stream: column 2 section ROW_INDEX start: 53126 length 39
+ Stream: column 3 section ROW_INDEX start: 53165 length 35
+ Stream: column 1 section DATA start: 53200 length 20029
+ Stream: column 2 section DATA start: 73229 length 40035
+ Stream: column 3 section DATA start: 113264 length 3526
+ Stream: column 3 section LENGTH start: 116790 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 116815 length 133
+ Encoding column 0: DIRECT_V2
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
Encoding column 3: DICTIONARY[35]
- Stripe: offset: 139574 data: 69570 rows: 5000 tail: 73 index: 120
- Stream: column 0 section ROW_INDEX start: 139574 length 10
- Stream: column 1 section ROW_INDEX start: 139584 length 36
- Stream: column 2 section ROW_INDEX start: 139620 length 39
- Stream: column 3 section ROW_INDEX start: 139659 length 35
- Stream: column 1 section DATA start: 139694 length 22594
- Stream: column 2 section DATA start: 162288 length 43415
- Stream: column 3 section DATA start: 205703 length 3390
- Stream: column 3 section LENGTH start: 209093 length 38
- Stream: column 3 section DICTIONARY_DATA start: 209131 length 133
- Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
+ Stripe: offset: 117021 data: 63769 rows: 5000 tail: 73 index: 120
+ Stream: column 0 section ROW_INDEX start: 117021 length 10
+ Stream: column 1 section ROW_INDEX start: 117031 length 36
+ Stream: column 2 section ROW_INDEX start: 117067 length 39
+ Stream: column 3 section ROW_INDEX start: 117106 length 35
+ Stream: column 1 section DATA start: 117141 length 20029
+ Stream: column 2 section DATA start: 137170 length 40035
+ Stream: column 3 section DATA start: 177205 length 3547
+ Stream: column 3 section LENGTH start: 180752 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 180777 length 133
+ Encoding column 0: DIRECT_V2
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
Encoding column 3: DICTIONARY[35]
- Stripe: offset: 209337 data: 69551 rows: 5000 tail: 72 index: 119
- Stream: column 0 section ROW_INDEX start: 209337 length 10
- Stream: column 1 section ROW_INDEX start: 209347 length 35
- Stream: column 2 section ROW_INDEX start: 209382 length 39
- Stream: column 3 section ROW_INDEX start: 209421 length 35
- Stream: column 1 section DATA start: 209456 length 22575
- Stream: column 2 section DATA start: 232031 length 43426
- Stream: column 3 section DATA start: 275457 length 3379
- Stream: column 3 section LENGTH start: 278836 length 38
- Stream: column 3 section DICTIONARY_DATA start: 278874 length 133
- Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
- Encoding column 3: DICTIONARY[35]
- Stripe: offset: 279079 data: 14096 rows: 1000 tail: 68 index: 120
- Stream: column 0 section ROW_INDEX start: 279079 length 10
- Stream: column 1 section ROW_INDEX start: 279089 length 36
- Stream: column 2 section ROW_INDEX start: 279125 length 39
- Stream: column 3 section ROW_INDEX start: 279164 length 35
- Stream: column 1 section DATA start: 279199 length 4529
- Stream: column 2 section DATA start: 283728 length 8690
- Stream: column 3 section DATA start: 292418 length 706
- Stream: column 3 section LENGTH start: 293124 length 38
- Stream: column 3 section DICTIONARY_DATA start: 293162 length 133
- Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
+ Stripe: offset: 180983 data: 63785 rows: 5000 tail: 73 index: 120
+ Stream: column 0 section ROW_INDEX start: 180983 length 10
+ Stream: column 1 section ROW_INDEX start: 180993 length 36
+ Stream: column 2 section ROW_INDEX start: 181029 length 39
+ Stream: column 3 section ROW_INDEX start: 181068 length 35
+ Stream: column 1 section DATA start: 181103 length 20029
+ Stream: column 2 section DATA start: 201132 length 40035
+ Stream: column 3 section DATA start: 241167 length 3563
+ Stream: column 3 section LENGTH start: 244730 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 244755 length 133
+ Encoding column 0: DIRECT_V2
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
Encoding column 3: DICTIONARY[35]
+ Stripe: offset: 244961 data: 23837 rows: 1857 tail: 68 index: 120
+ Stream: column 0 section ROW_INDEX start: 244961 length 10
+ Stream: column 1 section ROW_INDEX start: 244971 length 36
+ Stream: column 2 section ROW_INDEX start: 245007 length 39
+ Stream: column 3 section ROW_INDEX start: 245046 length 35
+ Stream: column 1 section DATA start: 245081 length 7439
+ Stream: column 2 section DATA start: 252520 length 14870
+ Stream: column 3 section DATA start: 267390 length 1370
+ Stream: column 3 section LENGTH start: 268760 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 268785 length 133
+ Encoding column 0: DIRECT_V2
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
+ Encoding column 3: DICTIONARY[35]
\ No newline at end of file