diff --git ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
index f6acc00..cce1415 100644
--- ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
+++ ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
@@ -5157,10 +5157,12 @@ public ColumnEncoding getDefaultInstanceForType() {
implements com.google.protobuf.ProtocolMessageEnum {
DIRECT(0, 0),
DICTIONARY(1, 1),
+ DIRECT_V2(2, 2),
;
public static final int DIRECT_VALUE = 0;
public static final int DICTIONARY_VALUE = 1;
+ public static final int DIRECT_V2_VALUE = 2;
public final int getNumber() { return value; }
@@ -5169,6 +5171,7 @@ public static Kind valueOf(int value) {
switch (value) {
case 0: return DIRECT;
case 1: return DICTIONARY;
+ case 2: return DIRECT_V2;
default: return null;
}
}
@@ -5199,7 +5202,7 @@ public Kind findValueByNumber(int number) {
}
private static final Kind[] VALUES = {
- DIRECT, DICTIONARY,
+ DIRECT, DICTIONARY, DIRECT_V2,
};
public static Kind valueOf(
@@ -10568,41 +10571,42 @@ void setMagic(com.google.protobuf.ByteString value) {
"nd\022\016\n\006column\030\002 \001(\r\022\016\n\006length\030\003 \001(\004\"r\n\004Ki" +
"nd\022\013\n\007PRESENT\020\000\022\010\n\004DATA\020\001\022\n\n\006LENGTH\020\002\022\023\n" +
"\017DICTIONARY_DATA\020\003\022\024\n\020DICTIONARY_COUNT\020\004" +
- "\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\221\001\n\016Colum",
+ "\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\240\001\n\016Colum",
"nEncoding\022C\n\004kind\030\001 \002(\01625.org.apache.had" +
"oop.hive.ql.io.orc.ColumnEncoding.Kind\022\026" +
- "\n\016dictionarySize\030\002 \001(\r\"\"\n\004Kind\022\n\n\006DIRECT" +
- "\020\000\022\016\n\nDICTIONARY\020\001\"\214\001\n\014StripeFooter\0229\n\007s" +
- "treams\030\001 \003(\0132(.org.apache.hadoop.hive.ql" +
- ".io.orc.Stream\022A\n\007columns\030\002 \003(\01320.org.ap" +
- "ache.hadoop.hive.ql.io.orc.ColumnEncodin" +
- "g\"\236\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apache.ha" +
- "doop.hive.ql.io.orc.Type.Kind\022\024\n\010subtype" +
- "s\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\260\001\n\004Kind",
- "\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002\022\007\n\003IN" +
- "T\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE\020\006\022\n\n\006" +
- "STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020\t\022\010\n\004L" +
- "IST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNION\020\r\022\013\n" +
- "\007DECIMAL\020\016\"x\n\021StripeInformation\022\016\n\006offse" +
- "t\030\001 \001(\004\022\023\n\013indexLength\030\002 \001(\004\022\022\n\ndataLeng" +
- "th\030\003 \001(\004\022\024\n\014footerLength\030\004 \001(\004\022\024\n\014number" +
- "OfRows\030\005 \001(\004\"/\n\020UserMetadataItem\022\014\n\004name" +
- "\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n\006Footer\022\024\n\014head" +
- "erLength\030\001 \001(\004\022\025\n\rcontentLength\030\002 \001(\004\022D\n",
- "\007stripes\030\003 \003(\01323.org.apache.hadoop.hive." +
- "ql.io.orc.StripeInformation\0225\n\005types\030\004 \003" +
- "(\0132&.org.apache.hadoop.hive.ql.io.orc.Ty" +
- "pe\022D\n\010metadata\030\005 \003(\01322.org.apache.hadoop" +
- ".hive.ql.io.orc.UserMetadataItem\022\024\n\014numb" +
- "erOfRows\030\006 \001(\004\022F\n\nstatistics\030\007 \003(\01322.org" +
- ".apache.hadoop.hive.ql.io.orc.ColumnStat" +
- "istics\022\026\n\016rowIndexStride\030\010 \001(\r\"\255\001\n\nPostS" +
- "cript\022\024\n\014footerLength\030\001 \001(\004\022F\n\013compressi" +
- "on\030\002 \001(\01621.org.apache.hadoop.hive.ql.io.",
- "orc.CompressionKind\022\034\n\024compressionBlockS" +
- "ize\030\003 \001(\004\022\023\n\007version\030\004 \003(\rB\002\020\001\022\016\n\005magic\030" +
- "\300> \001(\t*:\n\017CompressionKind\022\010\n\004NONE\020\000\022\010\n\004Z" +
- "LIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003"
+ "\n\016dictionarySize\030\002 \001(\r\"1\n\004Kind\022\n\n\006DIRECT" +
+ "\020\000\022\016\n\nDICTIONARY\020\001\022\r\n\tDIRECT_V2\020\002\"\214\001\n\014St" +
+ "ripeFooter\0229\n\007streams\030\001 \003(\0132(.org.apache" +
+ ".hadoop.hive.ql.io.orc.Stream\022A\n\007columns" +
+ "\030\002 \003(\01320.org.apache.hadoop.hive.ql.io.or" +
+ "c.ColumnEncoding\"\236\002\n\004Type\0229\n\004kind\030\001 \002(\0162" +
+ "+.org.apache.hadoop.hive.ql.io.orc.Type." +
+ "Kind\022\024\n\010subtypes\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames",
+ "\030\003 \003(\t\"\260\001\n\004Kind\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t" +
+ "\n\005SHORT\020\002\022\007\n\003INT\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022" +
+ "\n\n\006DOUBLE\020\006\022\n\n\006STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tT" +
+ "IMESTAMP\020\t\022\010\n\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020" +
+ "\014\022\t\n\005UNION\020\r\022\013\n\007DECIMAL\020\016\"x\n\021StripeInfor" +
+ "mation\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002 " +
+ "\001(\004\022\022\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength\030" +
+ "\004 \001(\004\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMetad" +
+ "ataItem\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n" +
+ "\006Footer\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rcontent",
+ "Length\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apac" +
+ "he.hadoop.hive.ql.io.orc.StripeInformati" +
+ "on\0225\n\005types\030\004 \003(\0132&.org.apache.hadoop.hi" +
+ "ve.ql.io.orc.Type\022D\n\010metadata\030\005 \003(\01322.or" +
+ "g.apache.hadoop.hive.ql.io.orc.UserMetad" +
+ "ataItem\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatist" +
+ "ics\030\007 \003(\01322.org.apache.hadoop.hive.ql.io" +
+ ".orc.ColumnStatistics\022\026\n\016rowIndexStride\030" +
+ "\010 \001(\r\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 \001" +
+ "(\004\022F\n\013compression\030\002 \001(\01621.org.apache.had",
+ "oop.hive.ql.io.orc.CompressionKind\022\034\n\024co" +
+ "mpressionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003(" +
+ "\rB\002\020\001\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKind" +
+ "\022\010\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020" +
+ "\003"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
new file mode 100644
index 0000000..a023f92
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * Interface for reading integers.
+ */
+interface IntegerReader {
+
+ /**
+ * Seek to the position provided by index.
+ * @param index
+ * @throws IOException
+ */
+ void seek(PositionProvider index) throws IOException;
+
+ /**
+ * Skip number of specified rows.
+ * @param numValues
+ * @throws IOException
+ */
+ void skip(long numValues) throws IOException;
+
+ /**
+ * Check if there are any more values left.
+ * @return
+ * @throws IOException
+ */
+ boolean hasNext() throws IOException;
+
+ /**
+ * Return the next available value.
+ * @return
+ * @throws IOException
+ */
+ long next() throws IOException;
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
new file mode 100644
index 0000000..17238e2
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * Interface for writing integers.
+ */
+interface IntegerWriter {
+
+ /**
+ * Get position from the stream.
+ * @param recorder
+ * @throws IOException
+ */
+ void getPosition(PositionRecorder recorder) throws IOException;
+
+ /**
+ * Write the integer value
+ * @param value
+ * @throws IOException
+ */
+ void write(long value) throws IOException;
+
+ /**
+ * Flush the buffer
+ * @throws IOException
+ */
+ void flush() throws IOException;
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 06103e3..a85f9cb 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -114,14 +114,18 @@ public long getNext() {
protected final int columnId;
private BitFieldReader present = null;
protected boolean valuePresent = false;
+ protected boolean isDirectV2;
TreeReader(Path path, int columnId) {
this.path = path;
this.columnId = columnId;
+ // FIXME: make it user configurable?
+ this.isDirectV2 = true;
}
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
- if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId + " of " + path);
}
@@ -130,6 +134,9 @@ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
void startStripe(Map streams,
List encoding
) throws IOException {
+ if(encoding.get(columnId).getKind() == OrcProto.ColumnEncoding.Kind.DIRECT) {
+ this.isDirectV2 = false;
+ }
checkEncoding(encoding.get(columnId));
InStream in = streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.PRESENT));
@@ -263,7 +270,7 @@ void skipRows(long items) throws IOException {
}
private static class ShortTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
ShortTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -276,7 +283,11 @@ void startStripe(Map streams,
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ if (super.isDirectV2) {
+ reader = new RunLengthIntegerReaderV2(streams.get(name), true);
+ } else {
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
}
@Override
@@ -307,7 +318,7 @@ void skipRows(long items) throws IOException {
}
private static class IntTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
IntTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -320,7 +331,11 @@ void startStripe(Map streams,
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ if (isDirectV2) {
+ reader = new RunLengthIntegerReaderV2(streams.get(name), true);
+ } else {
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
}
@Override
@@ -351,7 +366,7 @@ void skipRows(long items) throws IOException {
}
private static class LongTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
LongTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -364,7 +379,11 @@ void startStripe(Map streams,
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ if (isDirectV2) {
+ reader = new RunLengthIntegerReaderV2(streams.get(name), true);
+ } else {
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
}
@Override
@@ -489,7 +508,7 @@ void skipRows(long items) throws IOException {
private static class BinaryTreeReader extends TreeReader{
private InStream stream;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths = null;
BinaryTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -503,9 +522,15 @@ void startStripe(Map streams,
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
stream = streams.get(name);
- lengths = new RunLengthIntegerReader(streams.get(new
- StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
- false);
+ if (isDirectV2) {
+ lengths = new RunLengthIntegerReaderV2(streams.get(new
+ StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+ false);
+ } else {
+ lengths = new RunLengthIntegerReader(streams.get(new
+ StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+ false);
+ }
}
@Override
@@ -544,7 +569,7 @@ Object next(Object previous) throws IOException {
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long lengthToSkip = 0;
- for(int i=0; i < items; ++i) {
+ for (int i = 0; i < items; ++i) {
lengthToSkip += lengths.next();
}
stream.skip(lengthToSkip);
@@ -552,8 +577,8 @@ void skipRows(long items) throws IOException {
}
private static class TimestampTreeReader extends TreeReader{
- private RunLengthIntegerReader data;
- private RunLengthIntegerReader nanos;
+ private IntegerReader data = null;
+ private IntegerReader nanos = null;
TimestampTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -564,10 +589,17 @@ void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
- data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.DATA)), true);
- nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.SECONDARY)), false);
+ if (isDirectV2) {
+ data = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)), true);
+ nanos = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.SECONDARY)), false);
+ } else {
+ data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)), true);
+ nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.SECONDARY)), false);
+ }
}
@Override
@@ -587,9 +619,12 @@ Object next(Object previous) throws IOException {
} else {
result = (Timestamp) previous;
}
- long millis = (data.next() + WriterImpl.BASE_TIMESTAMP) *
+ long millis = 0;
+ int newNanos = 0;
+ millis = (data.next() + WriterImpl.BASE_TIMESTAMP) *
WriterImpl.MILLIS_PER_SECOND;
- int newNanos = parseNanos(nanos.next());
+ newNanos = parseNanos(nanos.next());
+
// fix the rounding when we divided by 1000.
if (millis >= 0) {
millis += newNanos / 1000000;
@@ -623,7 +658,7 @@ void skipRows(long items) throws IOException {
private static class DecimalTreeReader extends TreeReader{
private InStream valueStream;
- private RunLengthIntegerReader scaleStream;
+ private IntegerReader scaleStream = null;
DecimalTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -636,8 +671,13 @@ void startStripe(Map streams,
super.startStripe(streams, encodings);
valueStream = streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.DATA));
- scaleStream = new RunLengthIntegerReader(streams.get(
- new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
+ if (isDirectV2) {
+ scaleStream = new RunLengthIntegerReaderV2(streams.get(
+ new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
+ } else {
+ scaleStream = new RunLengthIntegerReader(streams.get(
+ new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
+ }
}
@Override
@@ -671,7 +711,7 @@ void skipRows(long items) throws IOException {
private DynamicByteArray dictionaryBuffer = null;
private int dictionarySize;
private int[] dictionaryOffsets;
- private RunLengthIntegerReader reader;
+ private IntegerReader reader = null;
StringTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -706,13 +746,18 @@ void startStripe(Map streams,
// read the lengths
name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
in = streams.get(name);
- RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false);
+ IntegerReader lenReader = null;
+ if(isDirectV2) {
+ lenReader = new RunLengthIntegerReaderV2(in, false);
+ } else {
+ lenReader = new RunLengthIntegerReader(in, false);
+ }
int offset = 0;
if (dictionaryOffsets == null ||
dictionaryOffsets.length < dictionarySize + 1) {
dictionaryOffsets = new int[dictionarySize + 1];
}
- for(int i=0; i < dictionarySize; ++i) {
+ for (int i = 0; i < dictionarySize; ++i) {
dictionaryOffsets[i] = offset;
offset += (int) lenReader.next();
}
@@ -721,7 +766,11 @@ void startStripe(Map streams,
// set up the row reader
name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), false);
+ if (isDirectV2) {
+ reader = new RunLengthIntegerReaderV2(streams.get(name), false);
+ } else {
+ reader = new RunLengthIntegerReader(streams.get(name), false);
+ }
}
@Override
@@ -735,7 +784,8 @@ Object next(Object previous) throws IOException {
super.next(previous);
Text result = null;
if (valuePresent) {
- int entry = (int) reader.next();
+ int entry = 0;
+ entry = (int) reader.next();
if (previous == null) {
result = new Text();
} else {
@@ -919,7 +969,7 @@ void skipRows(long items) throws IOException {
private static class ListTreeReader extends TreeReader {
private final TreeReader elementReader;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths = null;
ListTreeReader(Path path, int columnId,
List types,
@@ -972,8 +1022,13 @@ void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
- lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.LENGTH)), false);
+ if (isDirectV2) {
+ lengths = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ } else {
+ lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ }
if (elementReader != null) {
elementReader.startStripe(streams, encodings);
}
@@ -983,7 +1038,7 @@ void startStripe(Map streams,
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long childSkip = 0;
- for(long i=0; i < items; ++i) {
+ for (long i = 0; i < items; ++i) {
childSkip += lengths.next();
}
elementReader.skipRows(childSkip);
@@ -993,7 +1048,7 @@ void skipRows(long items) throws IOException {
private static class MapTreeReader extends TreeReader {
private final TreeReader keyReader;
private final TreeReader valueReader;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths = null;
MapTreeReader(Path path,
int columnId,
@@ -1050,8 +1105,13 @@ void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
- lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.LENGTH)), false);
+ if (isDirectV2) {
+ lengths = new RunLengthIntegerReaderV2(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ } else {
+ lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ }
if (keyReader != null) {
keyReader.startStripe(streams, encodings);
}
@@ -1064,7 +1124,7 @@ void startStripe(Map streams,
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long childSkip = 0;
- for(long i=0; i < items; ++i) {
+ for (long i = 0; i < items; ++i) {
childSkip += lengths.next();
}
keyReader.skipRows(childSkip);
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
index 2825c64..9c825eb 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
@@ -23,7 +23,7 @@
/**
* A reader that reads a sequence of integers.
* */
-class RunLengthIntegerReader {
+class RunLengthIntegerReader implements IntegerReader {
private final InStream input;
private final boolean signed;
private final long[] literals =
@@ -71,11 +71,11 @@ private void readValues() throws IOException {
}
}
- boolean hasNext() throws IOException {
+ public boolean hasNext() throws IOException {
return used != numLiterals || input.available() > 0;
}
- long next() throws IOException {
+ public long next() throws IOException {
long result;
if (used == numLiterals) {
readValues();
@@ -88,7 +88,7 @@ long next() throws IOException {
return result;
}
- void seek(PositionProvider index) throws IOException {
+ public void seek(PositionProvider index) throws IOException {
input.seek(index);
int consumed = (int) index.getNext();
if (consumed != 0) {
@@ -104,7 +104,7 @@ void seek(PositionProvider index) throws IOException {
}
}
- void skip(long numValues) throws IOException {
+ public void skip(long numValues) throws IOException {
while (numValues > 0) {
if (used == numLiterals) {
readValues();
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
new file mode 100644
index 0000000..2cdc241
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
+
+/**
+ * A reader that reads a sequence of light weight compressed integers. Refer
+ * {@link RunLengthIntegerWriterV2} for description of various lightweight
+ * compression techniques.
+ */
+class RunLengthIntegerReaderV2 implements IntegerReader {
+ private final InStream input;
+ private final boolean signed;
+ private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE];
+ private int numLiterals = 0;
+ private int used = 0;
+
+ RunLengthIntegerReaderV2(InStream input, boolean signed) throws IOException {
+ this.input = input;
+ this.signed = signed;
+ }
+
+ private void readValues() throws IOException {
+ // read the first 2 bits and determine the encoding type
+ int firstByte = input.read();
+ if (firstByte < 0) {
+ throw new EOFException("Read past end of RLE integer from " + input);
+ } else {
+ int enc = (firstByte >>> 6) & 0x03;
+ if (EncodingType.SHORT_REPEAT.ordinal() == enc) {
+ readShortRepeatValues(firstByte);
+ } else if (EncodingType.DIRECT.ordinal() == enc) {
+ readDirectValues(firstByte);
+ } else if (EncodingType.PATCHED_BASE.ordinal() == enc) {
+ readPatchedBaseValues(firstByte);
+ } else {
+ readDeltaValues(firstByte);
+ }
+ }
+ }
+
+ private void readDeltaValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fb = (firstByte >>> 1) & 0x1f;
+ if (fb != 0) {
+ fb = SerializationUtils.decodeBitWidth(fb);
+ }
+
+ // extract the blob run length
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+
+ // read the first value stored as vint
+ long firstVal = 0;
+ if (signed) {
+ firstVal = SerializationUtils.readVslong(input);
+ } else {
+ firstVal = SerializationUtils.readVulong(input);
+ }
+
+ // store first value to result buffer
+ long prevVal = firstVal;
+ literals[numLiterals++] = firstVal;
+
+ // if fixed bits is 0 then all values have fixed delta
+ if (fb == 0) {
+ // read the fixed delta value stored as vint (deltas can be negative even
+ // if all number are positive)
+ long fd = SerializationUtils.readVslong(input);
+
+ // add fixed deltas to adjacent values
+ for (int i = 0; i < len; i++) {
+ literals[numLiterals++] = literals[numLiterals - 2] + fd;
+ }
+ } else {
+ long deltaBase = SerializationUtils.readVslong(input);
+ // add delta base and first value
+ literals[numLiterals++] = firstVal + deltaBase;
+ prevVal = literals[numLiterals - 1];
+ len -= 1;
+
+ // write the unpacked values, add it to previous value and store final
+ // value to result buffer. if the delta base value is negative then it
+ // is a decreasing sequence else an increasing sequence
+ SerializationUtils.readInts(literals, numLiterals, len, fb, input);
+ while (len > 0) {
+ if (deltaBase < 0) {
+ literals[numLiterals] = prevVal - literals[numLiterals];
+ } else {
+ literals[numLiterals] = prevVal + literals[numLiterals];
+ }
+ prevVal = literals[numLiterals];
+ len--;
+ numLiterals++;
+ }
+ }
+ }
+
+ private void readPatchedBaseValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fbo = (firstByte >>> 1) & 0x1f;
+ int fb = SerializationUtils.decodeBitWidth(fbo);
+
+ // extract the run length of data blob
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+ // runs are always one off
+ len += 1;
+
+ // extract the number of bytes occupied by base
+ int thirdByte = input.read();
+ int bw = (thirdByte >>> 5) & 0x07;
+ // base width is one off
+ bw += 1;
+
+ // extract patch width
+ int pwo = thirdByte & 0x1f;
+ int pw = SerializationUtils.decodeBitWidth(pwo);
+
+ // read fourth byte and extract patch gap width
+ int fourthByte = input.read();
+ int pgw = (fourthByte >>> 5) & 0x07;
+ // patch gap width is one off
+ pgw += 1;
+
+ // extract the length of the patch list
+ int pl = fourthByte & 0x1f;
+
+ // read the next base width number of bytes to extract base value
+ long base = SerializationUtils.bytesToLongBE(input, bw);
+ long mask = (1L << ((bw * 8) - 1));
+ // if MSB of base value is 1 then base is negative value else positive
+ if ((base & mask) != 0) {
+ base = base & ~mask;
+ base = -base;
+ }
+
+ // unpack the data blob
+ long[] unpacked = new long[len];
+ SerializationUtils.readInts(unpacked, 0, len, fb, input);
+
+ // unpack the patch blob
+ long[] unpackedPatch = new long[pl];
+ SerializationUtils.readInts(unpackedPatch, 0, pl, pw + pgw, input);
+
+ // apply the patch directly when decoding the packed data
+ int patchIdx = 0;
+ long currGap = 0;
+ long currPatch = 0;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ long actualGap = 0;
+
+ // special case: gap is >255 then patch value will be 0.
+ // if gap is <=255 then patch value cannot be 0
+ while (currGap == 255 && currPatch == 0) {
+ actualGap += 255;
+ patchIdx++;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ }
+ // add the left over gap
+ actualGap += currGap;
+
+ // unpack data blob, patch it (if required), add base to get final result
+ for (int i = 0; i < unpacked.length; i++) {
+ if (i == actualGap) {
+ // apply patch
+ long patchedVal = unpacked[i] | (currPatch << fb);
+
+ // add base to patched value and zigzag decode it
+ literals[numLiterals++] = base + patchedVal;
+
+ // increment the patch to point to next entry in patch list
+ patchIdx++;
+
+ if (patchIdx < pl) {
+ // read the next gap and patch
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ actualGap = 0;
+
+ // special case: gap is >255 then patch will be 0. if gap is
+ // <=255 then patch cannot be 0
+ while (currGap == 255 && currPatch == 0) {
+ actualGap += 255;
+ patchIdx++;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ }
+ // add the left over gap
+ actualGap += currGap;
+
+ // next gap is relative to the current gap
+ actualGap += i;
+ }
+ } else {
+ // no patching required. add base to unpacked value to get final value
+ literals[numLiterals++] = base + unpacked[i];
+ }
+ }
+
+ }
+
+ private void readDirectValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fbo = (firstByte >>> 1) & 0x1f;
+ int fb = SerializationUtils.decodeBitWidth(fbo);
+
+ // extract the run length
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+ // runs are one off
+ len += 1;
+
+ // write the unpacked values and zigzag decode to result buffer
+ SerializationUtils.readInts(literals, numLiterals, len, fb, input);
+ if (signed) {
+ for (int i = 0; i < len; i++) {
+ literals[numLiterals] = SerializationUtils
+ .zigzagDecode(literals[numLiterals]);
+ numLiterals++;
+ }
+ } else {
+ numLiterals += len;
+ }
+ }
+
+ private void readShortRepeatValues(int firstByte) throws IOException {
+
+ // read the number of bytes occupied by the value
+ int size = (firstByte >>> 3) & 0x07;
+ // #bytes are one off
+ size += 1;
+
+ // read the run length
+ int len = firstByte & 0x07;
+ // run lengths values are stored only after MIN_REPEAT value is met
+ len += RunLengthIntegerWriterV2.MIN_REPEAT;
+
+ // read the repeated value which is store using fixed bytes
+ long val = SerializationUtils.bytesToLongBE(input, size);
+
+ if (signed) {
+ val = SerializationUtils.zigzagDecode(val);
+ }
+
+ // repeat the value for length times
+ for (int i = 0; i < len; i++) {
+ literals[numLiterals++] = val;
+ }
+ }
+
+ public boolean hasNext() throws IOException {
+ return used != numLiterals || input.available() > 0;
+ }
+
+ public long next() throws IOException {
+ long result;
+ if (used == numLiterals) {
+ numLiterals = 0;
+ used = 0;
+ readValues();
+ }
+ result = literals[used++];
+ return result;
+ }
+
+ public void seek(PositionProvider index) throws IOException {
+ input.seek(index);
+ int consumed = (int) index.getNext();
+ if (consumed != 0) {
+ // a loop is required for cases where we break the run into two
+ // parts
+ while (consumed > 0) {
+ numLiterals = 0;
+ readValues();
+ used = consumed;
+ consumed -= numLiterals;
+ }
+ } else {
+ used = 0;
+ numLiterals = 0;
+ }
+ }
+
+ public void skip(long numValues) throws IOException {
+ while (numValues > 0) {
+ if (used == numLiterals) {
+ numLiterals = 0;
+ used = 0;
+ readValues();
+ }
+ long consume = Math.min(numValues, numLiterals - used);
+ used += consume;
+ numValues -= consume;
+ }
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
index aaca0a1..0497a56 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
@@ -25,7 +25,7 @@
* repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128
* literal vint values follow.
*/
-class RunLengthIntegerWriter {
+class RunLengthIntegerWriter implements IntegerWriter {
static final int MIN_REPEAT_SIZE = 3;
static final int MAX_DELTA = 127;
static final int MIN_DELTA = -128;
@@ -71,12 +71,12 @@ private void writeValues() throws IOException {
}
}
- void flush() throws IOException {
+ public void flush() throws IOException {
writeValues();
output.flush();
}
- void write(long value) throws IOException {
+ public void write(long value) throws IOException {
if (numLiterals == 0) {
literals[numLiterals++] = value;
tailRunLength = 1;
@@ -130,8 +130,9 @@ void write(long value) throws IOException {
}
}
- void getPosition(PositionRecorder recorder) throws IOException {
- output.getPosition(recorder);
- recorder.addPosition(numLiterals);
+ public void getPosition(PositionRecorder recorder) throws IOException {
+ output.getPosition(recorder);
+ recorder.addPosition(numLiterals);
}
+
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
new file mode 100644
index 0000000..26215a0
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
@@ -0,0 +1,814 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * A writer that performs light weight compression over sequence of integers.
+ *
+ * There are four types of lightweight integer compression
+ *
+ * - SHORT_REPEAT
+ * - DIRECT
+ * - PATCHED_BASE
+ * - DELTA
+ *
+ *
+ * The description and format for these types are as below:
+ *
+ * SHORT_REPEAT: Used for short repeated integer sequences.
+ *
+ * - 1 byte header
+ *
+ * - 2 bits for encoding type
+ * - 3 bits for bytes required for repeating value
+ * - 3 bits for repeat count (MIN_REPEAT + run length)
+ *
+ *
+ * - Blob - repeat value (fixed bytes)
+ *
+ *
+ *
+ * DIRECT: Used for random integer sequences whose number of bit
+ * requirement doesn't vary a lot.
+ *
+ * - 2 bytes header
+ *
+ * 1st byte
+ * - 2 bits for encoding type
+ * - 5 bits for fixed bit width of values in blob
+ * - 1 bit for storing MSB of run length
+ *
+ *
+ * 2nd byte
+ * - 8 bits for lower run length bits
+ *
+ *
+ * - Blob - fixed width * run length bits long
+ *
+ *
+ *
+ * PATCHED_BASE: Used for random integer sequences whose number of bit
+ * requirement varies beyond a threshold.
+ *
+ * - 4 bytes header
+ *
+ * 1st byte
+ * - 2 bits for encoding type
+ * - 5 bits for fixed bit width of values in blob
+ * - 1 bit for storing MSB of run length
+ *
+ *
+ * 2nd byte
+ * - 8 bits for lower run length bits
+ *
+ *
+ * 3rd byte
+ * - 3 bits for bytes required for base value
+ * - 5 bits for patch width
+ *
+ *
+ * 4th byte
+ * - 3 bits for patch gap width
+ * - 5 bits for patch length
+ *
+ *
+ * - Base value - base width * 8 bits
+ * - Data blob - fixed width * run length
+ * - Patch blob - (patch width + patch gap width) * patch length
+ *
+ *
+ *
+ * DELTA Used for monotonically increasing or decreasing sequences,
+ * sequences with fixed delta values or long repeated sequences.
+ *
+ * - 2 bytes header
+ *
+ * 1st byte
+ * - 2 bits for encoding type
+ * - 5 bits for fixed bit width of values in blob
+ * - 1 bit for storing MSB of run length
+ *
+ *
+ * 2nd byte
+ * - 8 bits for lower run length bits
+ *
+ *
+ * - Base value - encoded as varint
+ * - Delta base - encoded as varint
+ * - Delta blob - only positive values. monotonicity is decided based on the
+ * the sign of delta base
+ *
+ *
+ */
+class RunLengthIntegerWriterV2 implements IntegerWriter {
+
+ public enum EncodingType {
+ SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA
+ }
+
+ static final int MAX_SCOPE = 512;
+ static final int MIN_REPEAT = 3;
+ private static final int MAX_SHORT_REPEAT_LENGTH = 10;
+ private static final int MAX_SHORT_REPEAT_SIZE = 8;
+ private long prevDelta = 0;
+ private int fixedRunLength = 0;
+ private int variableRunLength = 0;
+ private long[] literals = new long[MAX_SCOPE];
+ private final PositionedOutputStream output;
+ private final boolean signed;
+ private EncodingType encoding;
+ private int numLiterals;
+ private long[] zigzagLiterals;
+ private long[] baseRedLiterals;
+ private long[] adjDeltas;
+ private long fixedDelta;
+ private int zzBits90p;
+ private int zzBits100p;
+ private int brBits95p;
+ private int brBits100p;
+ private int bitsDeltaMax;
+ private int patchWidth;
+ private int patchGapWidth;
+ private int patchLength;
+ private long[] gapVsPatchList;
+ private long min;
+ private boolean isFixedDelta;
+
+ RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) {
+ this.output = output;
+ this.signed = signed;
+ clear();
+ }
+
+ private void writeValues() throws IOException {
+ if (numLiterals != 0) {
+
+ if (encoding.equals(EncodingType.SHORT_REPEAT)) {
+ writeShortRepeatValues();
+ } else if (encoding.equals(EncodingType.DIRECT)) {
+ writeDirectValues();
+ } else if (encoding.equals(EncodingType.PATCHED_BASE)) {
+ writePatchedBaseValues();
+ } else {
+ writeDeltaValues();
+ }
+
+ // clear all the variables
+ clear();
+ }
+ }
+
+ private void writeDeltaValues() throws IOException {
+ int len = 0;
+ int fb = bitsDeltaMax;
+ int efb = 0;
+
+ if (isFixedDelta) {
+ // if the fixed delta is 0 then the sequence is counted as fixed
+ // run length else as variable run length
+ if (fixedRunLength > MIN_REPEAT) {
+ // ex. sequence: 2 2 2 2 2 2 2 2
+ len = fixedRunLength - 1;
+ fixedRunLength = 0;
+ } else {
+ // ex. sequence: 4 6 8 10 12 14 16
+ len = variableRunLength - 1;
+ variableRunLength = 0;
+ }
+ } else {
+ // fixed width 0 is used for fixed delta runs. sequence that require
+ // only 1 bit to encode will have an additional bit
+ if (fb == 1) {
+ fb = 2;
+ }
+ efb = SerializationUtils.encodeBitWidth(fb);
+ efb = efb << 1;
+ len = variableRunLength - 1;
+ variableRunLength = 0;
+ }
+
+ // extract the 9th bit of run length
+ int tailBits = (len & 0x100) >>> 8;
+
+ // create first byte of the header
+ int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ int headerSecondByte = len & 0xff;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+
+ // store the first value from zigzag literal array
+ if (signed) {
+ SerializationUtils.writeVslong(output, literals[0]);
+ } else {
+ SerializationUtils.writeVulong(output, literals[0]);
+ }
+
+ // if delta is fixed then we don't need to store delta blob else store
+ // delta blob using fixed bit packing
+ if (isFixedDelta) {
+ // store the fixed delta using zigzag encoding. deltas can be negative
+ // even if all values are positive
+ SerializationUtils.writeVslong(output, fixedDelta);
+ } else {
+ // bit pack the delta blob
+ SerializationUtils.writeVslong(output, adjDeltas[0]);
+ SerializationUtils.writeInts(adjDeltas, 1, adjDeltas.length - 1, fb,
+ output);
+ }
+ }
+
+ private void writePatchedBaseValues() throws IOException {
+
+ // write the number of fixed bits required in next 5 bits
+ int fb = brBits95p;
+ int efb = SerializationUtils.encodeBitWidth(fb) << 1;
+
+ // adjust variable run length, they are one off
+ variableRunLength -= 1;
+
+ // extract the 9th bit of run length
+ int tailBits = (variableRunLength & 0x100) >>> 8;
+
+ // create first byte of the header
+ int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ int headerSecondByte = variableRunLength & 0xff;
+
+ boolean isNegative = min < 0 ? true : false;
+ if (isNegative) {
+ min = -min;
+ }
+
+ // find the number of bytes required for base and shift it by 5 bits
+ // to accommodate patch width. The additional bit is used to store the sign
+ // of the base value.
+ int baseWidth = SerializationUtils.findClosestNumBits(min) + 1;
+ int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1;
+ int bb = (baseBytes - 1) << 5;
+
+ // if the base value is negative then add 1 to MSB
+ if (isNegative) {
+ min |= (1L << ((baseBytes * 8) - 1));
+ }
+
+ // third byte contains 3 bits for number of bytes occupied by base
+ // and 5 bits for patchWidth
+ int headerThirdByte = bb | SerializationUtils.encodeBitWidth(patchWidth);
+
+ // fourth byte contains 3 bits for page gap width and 5 bits for
+ // patch length
+ int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+ output.write(headerThirdByte);
+ output.write(headerFourthByte);
+
+ // write the base value using fixed bytes in big endian order
+ for (int i = baseBytes - 1; i >= 0; i--) {
+ byte b = (byte) ((min >>> (i * 8)) & 0xff);
+ output.write(b);
+ }
+
+ // bit packing the delta values and write each bytes
+ int closestFixedBits = SerializationUtils.getClosestFixedBits(brBits95p);
+ SerializationUtils.writeInts(baseRedLiterals, 0, baseRedLiterals.length,
+ closestFixedBits, output);
+
+ // write the patch blob
+ closestFixedBits = SerializationUtils.getClosestFixedBits(patchGapWidth
+ + patchWidth);
+ SerializationUtils.writeInts(gapVsPatchList, 0, gapVsPatchList.length,
+ closestFixedBits, output);
+
+ // reset run length
+ variableRunLength = 0;
+ }
+
+ private int getOpcode() {
+ return encoding.ordinal() << 6;
+ }
+
+ private void writeDirectValues() throws IOException {
+
+ // write the number of fixed bits required in next 5 bits
+ int efb = SerializationUtils.encodeBitWidth(zzBits100p) << 1;
+
+ // adjust variable run length
+ variableRunLength -= 1;
+
+ // extract the 9th bit of run length
+ int tailBits = (variableRunLength & 0x100) >>> 8;
+
+ // create first byte of the header
+ int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of
+ // runlength
+ int headerSecondByte = variableRunLength & 0xff;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+
+ // bit packing the delta values and write each bytes
+ SerializationUtils.writeInts(zigzagLiterals, 0, zigzagLiterals.length,
+ zzBits100p, output);
+
+ // reset run length
+ variableRunLength = 0;
+ }
+
+ private void writeShortRepeatValues() throws IOException {
+ // get the value that is repeating, compute the bits and bytes required
+ long repeatVal = 0;
+ if (signed) {
+ repeatVal = SerializationUtils.zigzagEncode(literals[0]);
+ } else {
+ repeatVal = literals[0];
+ }
+
+ int numBitsRepeatVal = SerializationUtils.findClosestNumBits(repeatVal);
+ int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3
+ : (numBitsRepeatVal >>> 3) + 1;
+
+ // if the runs are long or too short and if the delta is non zero, then
+ // choose a different algorithm
+ if (fixedRunLength >= MIN_REPEAT
+ && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH
+ && numBytesRepeatVal <= MAX_SHORT_REPEAT_SIZE && prevDelta == 0) {
+ // write encoding type in top 2 bits
+ int header = getOpcode();
+
+ // write the number of bytes required for the value
+ header |= ((numBytesRepeatVal - 1) << 3);
+
+ // write the run length
+ fixedRunLength -= MIN_REPEAT;
+ header |= fixedRunLength;
+
+ // write the header
+ output.write(header);
+
+ // write the payload (i.e. the repeat value) in big endian
+ for (int i = numBytesRepeatVal - 1; i >= 0; i--) {
+ int b = (int) ((repeatVal >>> (i * 8)) & 0xff);
+ output.write(b);
+ }
+
+ fixedRunLength = 0;
+ } else {
+ determineEncoding();
+ writeValues();
+ }
+ }
+
+ private void determineEncoding() {
+ // used for direct encoding
+ zigzagLiterals = new long[numLiterals];
+
+ // used for patched base encoding
+ baseRedLiterals = new long[numLiterals];
+
+ // used for delta encoding
+ adjDeltas = new long[numLiterals - 1];
+
+ int idx = 0;
+
+ // for identifying monotonic sequences
+ boolean isIncreasing = false;
+ int increasingCount = 1;
+ boolean isDecreasing = false;
+ int decreasingCount = 1;
+
+ // for identifying type of delta encoding
+ min = literals[0];
+ long max = literals[0];
+ isFixedDelta = true;
+ long currDelta = 0;
+
+ min = literals[0];
+ long deltaMax = 0;
+
+ // populate all variables to identify the encoding type
+ if (numLiterals >= 1) {
+ currDelta = literals[1] - literals[0];
+ for (int i = 0; i < numLiterals; i++) {
+ if (i > 0 && literals[i] >= max) {
+ max = literals[i];
+ increasingCount++;
+ }
+
+ if (i > 0 && literals[i] <= min) {
+ min = literals[i];
+ decreasingCount++;
+ }
+
+ // if delta doesn't changes then mark it as fixed delta
+ if (i > 0 && isFixedDelta) {
+ if (literals[i] - literals[i - 1] != currDelta) {
+ isFixedDelta = false;
+ }
+
+ fixedDelta = currDelta;
+ }
+
+ // store the minimum value among zigzag encoded values. The min
+ // value (base) will be removed in patched base encoding
+ long zzEncVal = 0;
+ if (signed) {
+ zzEncVal = SerializationUtils.zigzagEncode(literals[i]);
+ } else {
+ zzEncVal = literals[i];
+ }
+ zigzagLiterals[idx] = zzEncVal;
+ idx++;
+
+ // max delta value is required for computing the fixed bits
+ // required for delta blob in delta encoding
+ if (i > 0) {
+ if (i == 1) {
+ // first value preserve the sign
+ adjDeltas[i - 1] = literals[i] - literals[i - 1];
+ } else {
+ adjDeltas[i - 1] = Math.abs(literals[i] - literals[i - 1]);
+ if (adjDeltas[i - 1] > deltaMax) {
+ deltaMax = adjDeltas[i - 1];
+ }
+ }
+ }
+ }
+
+ // stores the number of bits required for packing delta blob in
+ // delta encoding
+ bitsDeltaMax = SerializationUtils.findClosestNumBits(deltaMax);
+
+ // if decreasing count equals total number of literals then the
+ // sequence is monotonically decreasing
+ if (increasingCount == 1 && decreasingCount == numLiterals) {
+ isDecreasing = true;
+ }
+
+ // if increasing count equals total number of literals then the
+ // sequence is monotonically increasing
+ if (decreasingCount == 1 && increasingCount == numLiterals) {
+ isIncreasing = true;
+ }
+ }
+
+ // if the sequence is both increasing and decreasing then it is not
+ // monotonic
+ if (isDecreasing && isIncreasing) {
+ isDecreasing = false;
+ isIncreasing = false;
+ }
+
+ // fixed delta condition
+ if (isIncreasing == false && isDecreasing == false && isFixedDelta == true) {
+ encoding = EncodingType.DELTA;
+ return;
+ }
+
+ // monotonic condition
+ if (isIncreasing || isDecreasing) {
+ encoding = EncodingType.DELTA;
+ return;
+ }
+
+ // percentile values are computed for the zigzag encoded values. if the
+ // number of bit requirement between 90th and 100th percentile varies
+ // beyond a threshold then we need to patch the values. if the variation
+ // is not significant then we can use direct or delta encoding
+
+ double p = 0.9;
+ zzBits90p = SerializationUtils.percentileBits(zigzagLiterals, p);
+
+ p = 1.0;
+ zzBits100p = SerializationUtils.percentileBits(zigzagLiterals, p);
+
+ int diffBitsLH = zzBits100p - zzBits90p;
+
+ // if the difference between 90th percentile and 100th percentile fixed
+ // bits is > 1 then we need patch the values
+ if (isIncreasing == false && isDecreasing == false && diffBitsLH > 1
+ && isFixedDelta == false) {
+ // patching is done only on base reduced values.
+ // remove base from literals
+ for (int i = 0; i < zigzagLiterals.length; i++) {
+ baseRedLiterals[i] = literals[i] - min;
+ }
+
+ // 95th percentile width is used to determine max allowed value
+ // after which patching will be done
+ p = 0.95;
+ brBits95p = SerializationUtils.percentileBits(baseRedLiterals, p);
+
+ // 100th percentile is used to compute the max patch width
+ p = 1.0;
+ brBits100p = SerializationUtils.percentileBits(baseRedLiterals, p);
+
+ // after base reducing the values, if the difference in bits between
+ // 95th percentile and 100th percentile value is zero then there
+ // is no point in patching the values, in which case we will
+ // fallback to DIRECT encoding.
+ // The decision to use patched base was based on zigzag values, but the
+ // actual patching is done on base reduced literals.
+ if ((brBits100p - brBits95p) != 0) {
+ encoding = EncodingType.PATCHED_BASE;
+ preparePatchedBlob();
+ return;
+ } else {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+ }
+
+ // if difference in bits between 95th percentile and 100th percentile is
+ // 0, then patch length will become 0. Hence we will fallback to direct
+ if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1
+ && isFixedDelta == false) {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+
+ // this should not happen
+ if (encoding == null) {
+ throw new RuntimeException("Integer encoding cannot be determined.");
+ }
+ }
+
+ private void preparePatchedBlob() {
+ // mask will be max value beyond which patch will be generated
+ int mask = (1 << brBits95p) - 1;
+
+ // since we are considering only 95 percentile, the size of gap and
+ // patch array can contain only be 5% values
+ patchLength = (int) Math.ceil((baseRedLiterals.length * 0.05));
+ int[] gapList = new int[patchLength];
+ long[] patchList = new long[patchLength];
+
+ // #bit for patch
+ patchWidth = brBits100p - brBits95p;
+ patchWidth = SerializationUtils.getClosestFixedBits(patchWidth);
+
+ int gapIdx = 0;
+ int patchIdx = 0;
+ int prev = 0;
+ int gap = 0;
+ int maxGap = 0;
+
+ for (int i = 0; i < baseRedLiterals.length; i++) {
+ // if value is above mask then create the patch and record the gap
+ if (baseRedLiterals[i] > mask) {
+ gap = i - prev;
+ if (gap > maxGap) {
+ maxGap = gap;
+ }
+
+ // gaps are relative, so store the previous patched value
+ prev = i;
+ gapList[gapIdx++] = gap;
+
+ // extract the most significant bits that are over mask
+ long patch = baseRedLiterals[i] >>> brBits95p;
+ patchList[patchIdx++] = patch;
+
+ // strip off the MSB to enable safe bit packing
+ baseRedLiterals[i] &= mask;
+ }
+ }
+
+ // adjust the patch length to number of entries in gap list
+ patchLength = gapIdx;
+
+ // if the element to be patched is the first and only element then
+ // max gap will be 0, but to store the gap as 0 we need atleast 1 bit
+ if (maxGap == 0 && patchLength != 0) {
+ patchGapWidth = 1;
+ } else {
+ patchGapWidth = SerializationUtils.findClosestNumBits(maxGap);
+ }
+
+ // special case: if the patch gap width is greater than 256, then
+ // we need 9 bits to encode the gap width. But we only have 3 bits in
+ // header to record the gap width. To deal with this case, we will save
+ // two entries in final patch list with following entries
+ // 256 gap width => 0 for patch value
+ // actual gap - 256 => actual patch value
+ if (patchGapWidth > 8) {
+ patchGapWidth = 8;
+ // for gap = 511, we need two additional entries in patch list
+ if (maxGap == 511) {
+ patchLength += 2;
+ } else {
+ patchLength += 1;
+ }
+ }
+
+ // create gap vs patch list
+ gapIdx = 0;
+ patchIdx = 0;
+ gapVsPatchList = new long[patchLength];
+ for (int i = 0; i < patchLength; i++) {
+ long g = gapList[gapIdx++];
+ long p = patchList[patchIdx++];
+ while (g > 255) {
+ gapVsPatchList[i++] = (255 << patchWidth) | 0;
+ g -= 255;
+ }
+
+ // store patch value in LSBs and gap in MSBs
+ gapVsPatchList[i] = (g << patchWidth) | p;
+ }
+ }
+
+ /**
+ * clears all the variables
+ */
+ private void clear() {
+ numLiterals = 0;
+ encoding = null;
+ prevDelta = 0;
+ zigzagLiterals = null;
+ baseRedLiterals = null;
+ adjDeltas = null;
+ fixedDelta = 0;
+ zzBits90p = 0;
+ zzBits100p = 0;
+ brBits95p = 0;
+ brBits100p = 0;
+ bitsDeltaMax = 0;
+ patchGapWidth = 0;
+ patchLength = 0;
+ patchWidth = 0;
+ gapVsPatchList = null;
+ min = 0;
+ isFixedDelta = false;
+ }
+
+ public void flush() throws IOException {
+ // if only one element is left in buffer then use short repeat encoding
+ if (numLiterals == 1) {
+ encoding = EncodingType.SHORT_REPEAT;
+ fixedRunLength = 1;
+ writeValues();
+ }
+
+ // if variable runs are not 0 then determine the delta encoding to use
+ // and flush out the buffer
+ if (variableRunLength != 0 && numLiterals != 0) {
+ determineEncoding();
+ writeValues();
+ }
+
+ // if fixed runs are not 0 then flush out the buffer
+ if (fixedRunLength != 0 && numLiterals != 0) {
+ if (fixedRunLength < MIN_REPEAT) {
+ variableRunLength = fixedRunLength;
+ fixedRunLength = 0;
+ determineEncoding();
+ writeValues();
+ } else {
+ encoding = EncodingType.SHORT_REPEAT;
+ writeValues();
+ }
+ }
+
+ output.flush();
+ }
+
+ public void write(long val) throws IOException {
+ if (numLiterals == 0) {
+ initializeLiterals(val);
+ } else {
+ if (numLiterals == 1) {
+ prevDelta = val - literals[0];
+ literals[numLiterals++] = val;
+ // if both values are same count as fixed run else variable run
+ if (val == literals[0]) {
+ fixedRunLength = 2;
+ variableRunLength = 0;
+ } else {
+ fixedRunLength = 0;
+ variableRunLength = 2;
+ }
+ } else {
+ long currentDelta = val - literals[numLiterals - 1];
+ if (prevDelta == 0 && currentDelta == 0) {
+ // fixed delta run
+
+ literals[numLiterals++] = val;
+
+ // if variable run is non-zero then we are seeing repeating
+ // values at the end of variable run in which case keep
+ // updating variable and fixed runs
+ if (variableRunLength > 0) {
+ fixedRunLength = 2;
+ }
+ fixedRunLength += 1;
+
+ // if fixed run met the minimum condition and if variable
+ // run is non-zero then flush the variable run and shift the
+ // tail fixed runs to start of the buffer
+ if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) {
+ numLiterals -= MIN_REPEAT;
+ variableRunLength -= MIN_REPEAT - 1;
+ // copy the tail fixed runs
+ long[] tailVals = new long[MIN_REPEAT];
+ System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT);
+
+ // determine variable encoding and flush values
+ determineEncoding();
+ writeValues();
+
+ // shift tail fixed runs to beginning of the buffer
+ for (long l : tailVals) {
+ literals[numLiterals++] = l;
+ }
+ }
+
+ // if fixed runs reached max repeat length then write values
+ if (fixedRunLength == MAX_SCOPE) {
+ determineEncoding();
+ writeValues();
+ }
+ } else {
+ // variable delta run
+
+ // if fixed run length is non-zero and if satisfies the
+ // short repeat algorithm conditions then write the values
+ // as short repeats else determine the appropriate algorithm
+ // to persist the values
+ if (fixedRunLength >= MIN_REPEAT) {
+ if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+ encoding = EncodingType.SHORT_REPEAT;
+ writeValues();
+ } else {
+ determineEncoding();
+ writeValues();
+ }
+ }
+
+ // if fixed run length is 0 && fixedRunLength < MIN_REPEAT) {
+ if (val != literals[numLiterals - 1]) {
+ variableRunLength = fixedRunLength;
+ fixedRunLength = 0;
+ }
+ }
+
+ // after writing values re-initialize the variables
+ if (numLiterals == 0) {
+ initializeLiterals(val);
+ } else {
+ // keep updating variable run lengths
+ prevDelta = val - literals[numLiterals - 1];
+ literals[numLiterals++] = val;
+ variableRunLength += 1;
+
+ // if variable run length reach the max scope, write it
+ if (variableRunLength == MAX_SCOPE) {
+ determineEncoding();
+ writeValues();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void initializeLiterals(long val) {
+ literals[numLiterals++] = val;
+ fixedRunLength = 1;
+ variableRunLength = 1;
+ }
+
+ public void getPosition(PositionRecorder recorder) throws IOException {
+ output.getPosition(recorder);
+ recorder.addPosition(numLiterals);
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
index 67762b5..d08c1cb 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
@@ -185,4 +185,279 @@ static BigInteger readBigInteger(InputStream input) throws IOException {
result = result.shiftRight(1);
return result;
}
+
+ enum FixedBitSizes {
+ ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE,
+ THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN,
+ TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX,
+ TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR;
+ }
+
+ /**
+ * Count the number of bits required to encode the given value
+ * @param value
+ * @return bits required to store value
+ */
+ static int findClosestNumBits(long value) {
+ int count = 0;
+ while (value > 0) {
+ count++;
+ value = value >>> 1;
+ }
+ return getClosestFixedBits(count);
+ }
+
+ /**
+ * zigzag encode the given value
+ * @param val
+ * @return zigzag encoded value
+ */
+ static long zigzagEncode(long val) {
+ return (val << 1) ^ (val >> 63);
+ }
+
+ /**
+ * zigzag decode the given value
+ * @param val
+ * @return zizag decoded value
+ */
+ static long zigzagDecode(long val) {
+ return (val >>> 1) ^ -(val & 1);
+ }
+
+ /**
+ * Compute the bits required to represent pth percentile value
+ * @param data - array
+ * @param p - percentile value (>=0.0 to <=1.0)
+ * @return pth percentile bits
+ */
+ static int percentileBits(long[] data, double p) {
+ if ((p > 1.0) || (p <= 0.0)) {
+ return -1;
+ }
+
+ // histogram that store the encoded bit requirement for each values.
+ // maximum number of bits that can encoded is 32 (refer FixedBitSizes)
+ int[] hist = new int[32];
+
+ // compute the histogram
+ for (long l : data) {
+ int idx = encodeBitWidth(findClosestNumBits(l));
+ hist[idx] += 1;
+ }
+
+ int len = data.length;
+ int perLen = (int) (len * (1.0 - p));
+
+ // return the bits required by pth percentile length
+ for (int i = hist.length - 1; i >= 0; i--) {
+ perLen -= hist[i];
+ if (perLen < 0) {
+ return decodeBitWidth(i);
+ }
+ }
+
+ return 0;
+ }
+
+ /**
+ * Read n bytes in big endian order and convert to long
+ * @param b - byte array
+ * @return long value
+ */
+ static long bytesToLongBE(InStream input, int n) throws IOException {
+ long out = 0;
+ long val = 0;
+ while (n > 0) {
+ n--;
+ // store it in a long and then shift else integer overflow will occur
+ val = input.read();
+ out |= (val << (n * 8));
+ }
+ return out;
+ }
+
+ /**
+ * Calculate the number of bytes required
+ * @param n - number of values
+ * @param numBits - bit width
+ * @return number of bytes required
+ */
+ static int getTotalBytesRequired(int n, int numBits) {
+ return (n * numBits + 7) / 8;
+ }
+
+ /**
+ * For a given fixed bit this function will return the closest available fixed
+ * bit
+ * @param n
+ * @return closest valid fixed bit
+ */
+ static int getClosestFixedBits(int n) {
+ if (n == 0) {
+ return 1;
+ }
+
+ if (n >= 1 && n <= 24) {
+ return n;
+ } else if (n > 24 && n <= 26) {
+ return 26;
+ } else if (n > 26 && n <= 28) {
+ return 28;
+ } else if (n > 28 && n <= 30) {
+ return 30;
+ } else if (n > 30 && n <= 32) {
+ return 32;
+ } else if (n > 32 && n <= 40) {
+ return 40;
+ } else if (n > 40 && n <= 48) {
+ return 48;
+ } else if (n > 48 && n <= 56) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
+ /**
+ * Finds the closest available fixed bit width match and returns its encoded
+ * value (ordinal)
+ * @param n - fixed bit width to encode
+ * @return encoded fixed bit width
+ */
+ static int encodeBitWidth(int n) {
+ n = getClosestFixedBits(n);
+
+ if (n >= 1 && n <= 24) {
+ return n - 1;
+ } else if (n > 24 && n <= 26) {
+ return FixedBitSizes.TWENTYSIX.ordinal();
+ } else if (n > 26 && n <= 28) {
+ return FixedBitSizes.TWENTYEIGHT.ordinal();
+ } else if (n > 28 && n <= 30) {
+ return FixedBitSizes.THIRTY.ordinal();
+ } else if (n > 30 && n <= 32) {
+ return FixedBitSizes.THIRTYTWO.ordinal();
+ } else if (n > 32 && n <= 40) {
+ return FixedBitSizes.FORTY.ordinal();
+ } else if (n > 40 && n <= 48) {
+ return FixedBitSizes.FORTYEIGHT.ordinal();
+ } else if (n > 48 && n <= 56) {
+ return FixedBitSizes.FIFTYSIX.ordinal();
+ } else {
+ return FixedBitSizes.SIXTYFOUR.ordinal();
+ }
+ }
+
+ /**
+ * Decodes the ordinal fixed bit value to actual fixed bit width value
+ * @param n - encoded fixed bit width
+ * @return decoded fixed bit width
+ */
+ static int decodeBitWidth(int n) {
+ if (n >= FixedBitSizes.ONE.ordinal()
+ && n <= FixedBitSizes.TWENTYFOUR.ordinal()) {
+ return n + 1;
+ } else if (n == FixedBitSizes.TWENTYSIX.ordinal()) {
+ return 26;
+ } else if (n == FixedBitSizes.TWENTYEIGHT.ordinal()) {
+ return 28;
+ } else if (n == FixedBitSizes.THIRTY.ordinal()) {
+ return 30;
+ } else if (n == FixedBitSizes.THIRTYTWO.ordinal()) {
+ return 32;
+ } else if (n == FixedBitSizes.FORTY.ordinal()) {
+ return 40;
+ } else if (n == FixedBitSizes.FORTYEIGHT.ordinal()) {
+ return 48;
+ } else if (n == FixedBitSizes.FIFTYSIX.ordinal()) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
+ /**
+ * Bitpack and write the input values to underlying output stream
+ * @param input - values to write
+ * @param offset - offset
+ * @param len - length
+ * @param bitSize - bit width
+ * @param output - output stream
+ * @throws IOException
+ */
+ static void writeInts(long[] input, int offset, int len, int bitSize,
+ OutputStream output) throws IOException {
+ if (input == null || input.length < 1 || offset < 0 || len < 1
+ || bitSize < 1) {
+ return;
+ }
+
+ int bitsLeft = 8;
+ byte current = 0;
+ for (int i = offset; i < (offset + len); i++) {
+ long value = input[i];
+ int bitsToWrite = bitSize;
+ while (bitsToWrite > bitsLeft) {
+ // add the bits to the bottom of the current word
+ current |= value >>> (bitsToWrite - bitsLeft);
+ // subtract out the bits we just added
+ bitsToWrite -= bitsLeft;
+ // zero out the bits above bitsToWrite
+ value &= (1L << bitsToWrite) - 1;
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ bitsLeft -= bitsToWrite;
+ current |= value << bitsLeft;
+ if (bitsLeft == 0) {
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ }
+
+ // flush
+ if (bitsLeft != 8) {
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ }
+
+ /**
+ * Read bitpacked integers from input stream
+ * @param buffer - input buffer
+ * @param offset - offset
+ * @param len - length
+ * @param bitSize - bit width
+ * @param input - input stream
+ * @throws IOException
+ */
+ static void readInts(long[] buffer, int offset, int len, int bitSize,
+ InStream input) throws IOException {
+ int bitsLeft = 0;
+ int current = 0;
+
+ for (int i = offset; i < (offset + len); i++) {
+ long result = 0;
+ int bitsLeftToRead = bitSize;
+ while (bitsLeftToRead > bitsLeft) {
+ result <<= bitsLeft;
+ result |= current & ((1 << bitsLeft) - 1);
+ bitsLeftToRead -= bitsLeft;
+ current = input.read();
+ bitsLeft = 8;
+ }
+
+ // handle the left over bits
+ if (bitsLeftToRead > 0) {
+ result <<= bitsLeftToRead;
+ bitsLeft -= bitsLeftToRead;
+ result |= (current >> bitsLeft) & ((1 << bitsLeftToRead) - 1);
+ }
+ buffer[i] = result;
+ }
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 52defb9..64d9a9b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -364,6 +364,7 @@ public boolean isCompressed() {
private final PositionedOutputStream rowIndexStream;
private boolean foundNulls;
private OutStream isPresentOutStream;
+ protected final boolean useDirectV2Encoding;
/**
* Create a tree writer.
@@ -379,6 +380,9 @@ public boolean isCompressed() {
this.isCompressed = streamFactory.isCompressed();
this.id = columnId;
this.inspector = inspector;
+ // make direct v2 encoding as default
+ // FIXME: do we need to make this user configurable?
+ this.useDirectV2Encoding = true;
if (nullable) {
isPresentOutStream = streamFactory.createStream(id,
OrcProto.Stream.Kind.PRESENT);
@@ -493,6 +497,10 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
* @return the information about the encoding of this column
*/
OrcProto.ColumnEncoding getEncoding() {
+ if (useDirectV2Encoding) {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
return OrcProto.ColumnEncoding.newBuilder().setKind(
OrcProto.ColumnEncoding.Kind.DIRECT).build();
}
@@ -618,7 +626,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
}
private static class IntegerTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter writer;
+ private final IntegerWriter writer;
private final ShortObjectInspector shortInspector;
private final IntObjectInspector intInspector;
private final LongObjectInspector longInspector;
@@ -630,7 +638,11 @@ void recordPosition(PositionRecorder recorder) throws IOException {
super(columnId, inspector, writer, nullable);
PositionedOutputStream out = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
- this.writer = new RunLengthIntegerWriter(out, true);
+ if (super.useDirectV2Encoding) {
+ this.writer = new RunLengthIntegerWriterV2(out, true);
+ } else {
+ this.writer = new RunLengthIntegerWriter(out, true);
+ }
if (inspector instanceof IntObjectInspector) {
intInspector = (IntObjectInspector) inspector;
shortInspector = null;
@@ -759,8 +771,8 @@ void recordPosition(PositionRecorder recorder) throws IOException {
private static class StringTreeWriter extends TreeWriter {
private static final int INITIAL_DICTIONARY_SIZE = 4096;
private final PositionedOutputStream stringOutput;
- private final RunLengthIntegerWriter lengthOutput;
- private final RunLengthIntegerWriter rowOutput;
+ private final IntegerWriter lengthOutput;
+ private final IntegerWriter rowOutput;
private final StringRedBlackTree dictionary =
new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
private final DynamicIntArray rows = new DynamicIntArray();
@@ -776,10 +788,17 @@ void recordPosition(PositionRecorder recorder) throws IOException {
super(columnId, inspector, writer, nullable);
stringOutput = writer.createStream(id,
OrcProto.Stream.Kind.DICTIONARY_DATA);
- lengthOutput = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false);
- rowOutput = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), false);
+ if (super.useDirectV2Encoding == true) {
+ lengthOutput = new RunLengthIntegerWriterV2(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ rowOutput = new RunLengthIntegerWriterV2(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), false);
+ } else {
+ lengthOutput = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ rowOutput = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), false);
+ }
recordPosition(rowIndexPosition);
rowIndexValueCount.add(0L);
buildIndex = writer.buildIndex();
@@ -881,7 +900,7 @@ long estimateMemory() {
private static class BinaryTreeWriter extends TreeWriter {
private final PositionedOutputStream stream;
- private final RunLengthIntegerWriter length;
+ private final IntegerWriter length;
BinaryTreeWriter(int columnId,
ObjectInspector inspector,
@@ -890,8 +909,13 @@ long estimateMemory() {
super(columnId, inspector, writer, nullable);
this.stream = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
- this.length = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false);
+ if (super.useDirectV2Encoding) {
+ this.length = new RunLengthIntegerWriterV2(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ } else {
+ this.length = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ }
recordPosition(rowIndexPosition);
}
@@ -928,18 +952,25 @@ void recordPosition(PositionRecorder recorder) throws IOException {
Timestamp.valueOf("2015-01-01 00:00:00").getTime() / MILLIS_PER_SECOND;
private static class TimestampTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter seconds;
- private final RunLengthIntegerWriter nanos;
+ private final IntegerWriter seconds;
+ private final IntegerWriter nanos;
TimestampTreeWriter(int columnId,
ObjectInspector inspector,
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
- this.seconds = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), true);
- this.nanos = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), false);
+ if (super.useDirectV2Encoding) {
+ this.seconds = new RunLengthIntegerWriterV2(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), true);
+ this.nanos = new RunLengthIntegerWriterV2(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), false);
+ } else {
+ this.seconds = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), true);
+ this.nanos = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), false);
+ }
recordPosition(rowIndexPosition);
}
@@ -990,7 +1021,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
private static class DecimalTreeWriter extends TreeWriter {
private final PositionedOutputStream valueStream;
- private final RunLengthIntegerWriter scaleStream;
+ private final IntegerWriter scaleStream;
DecimalTreeWriter(int columnId,
ObjectInspector inspector,
@@ -998,8 +1029,13 @@ void recordPosition(PositionRecorder recorder) throws IOException {
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
- scaleStream = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), true);
+ if (super.useDirectV2Encoding) {
+ this.scaleStream = new RunLengthIntegerWriterV2(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), true);
+ } else {
+ this.scaleStream = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), true);
+ }
recordPosition(rowIndexPosition);
}
@@ -1076,7 +1112,7 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
}
private static class ListTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter lengths;
+ private final IntegerWriter lengths;
ListTreeWriter(int columnId,
ObjectInspector inspector,
@@ -1088,9 +1124,13 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
childrenWriters[0] =
createTreeWriter(listObjectInspector.getListElementObjectInspector(),
writer, true);
- lengths =
- new RunLengthIntegerWriter(writer.createStream(columnId,
+ if (super.useDirectV2Encoding) {
+ lengths = new RunLengthIntegerWriterV2(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false);
+ } else {
+ lengths = new RunLengthIntegerWriter(writer.createStream(columnId,
OrcProto.Stream.Kind.LENGTH), false);
+ }
recordPosition(rowIndexPosition);
}
@@ -1126,7 +1166,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
}
private static class MapTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter lengths;
+ private final IntegerWriter lengths;
MapTreeWriter(int columnId,
ObjectInspector inspector,
@@ -1139,9 +1179,15 @@ void recordPosition(PositionRecorder recorder) throws IOException {
createTreeWriter(insp.getMapKeyObjectInspector(), writer, true);
childrenWriters[1] =
createTreeWriter(insp.getMapValueObjectInspector(), writer, true);
- lengths =
- new RunLengthIntegerWriter(writer.createStream(columnId,
- OrcProto.Stream.Kind.LENGTH), false);
+ if (super.useDirectV2Encoding) {
+ lengths =
+ new RunLengthIntegerWriterV2(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false);
+ } else {
+ lengths =
+ new RunLengthIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false);
+ }
recordPosition(rowIndexPosition);
}
diff --git ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
index d19bc06..994be70 100644
--- ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
+++ ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
@@ -66,6 +66,7 @@ message ColumnEncoding {
enum Kind {
DIRECT = 0;
DICTIONARY = 1;
+ DIRECT_V2 = 2;
}
required Kind kind = 1;
optional uint32 dictionarySize = 2;
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
new file mode 100644
index 0000000..5d7cf1d
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Random;
+
+import org.junit.Test;
+
+import com.google.common.primitives.Longs;
+
+public class TestBitPack {
+
+ private static final int SIZE = 100;
+ private static Random rand = new Random(100);
+
+ private long[] deltaEncode(long[] inp) {
+ long[] output = new long[inp.length];
+ for (int i = 0; i < inp.length; i++) {
+ output[i] = SerializationUtils.zigzagEncode(inp[i]);
+ }
+ return output;
+ }
+
+ private long nextLong(Random rng, long n) {
+ long bits, val;
+ do {
+ bits = (rng.nextLong() << 1) >>> 1;
+ val = bits % n;
+ } while (bits - val + (n - 1) < 0L);
+ return val;
+ }
+
+ private void runTest(int numBits) throws IOException {
+ long[] inp = new long[SIZE];
+ for (int i = 0; i < SIZE; i++) {
+ long val = 0;
+ if (numBits <= 32) {
+ if (numBits == 1) {
+ val = -1 * rand.nextInt(2);
+ } else {
+ val = rand.nextInt((int) Math.pow(2, numBits - 1));
+ }
+ } else {
+ val = nextLong(rand, (long) Math.pow(2, numBits - 2));
+ }
+ if (val % 2 == 0) {
+ val = -val;
+ }
+ inp[i] = val;
+ }
+ long[] deltaEncoded = deltaEncode(inp);
+ long minInput = Collections.min(Longs.asList(deltaEncoded));
+ long maxInput = Collections.max(Longs.asList(deltaEncoded));
+ long rangeInput = maxInput - minInput;
+ int fixedWidth = SerializationUtils.findClosestNumBits(rangeInput);
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ OutStream output = new OutStream("test", SIZE, null, collect);
+ SerializationUtils.writeInts(deltaEncoded, 0, deltaEncoded.length,
+ fixedWidth, output);
+ output.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ long[] buff = new long[SIZE];
+ SerializationUtils.readInts(buff, 0, SIZE, fixedWidth,
+ InStream.create("test", inBuf, null, SIZE));
+ for (int i = 0; i < SIZE; i++) {
+ buff[i] = SerializationUtils.zigzagDecode(buff[i]);
+ }
+ assertEquals(numBits, fixedWidth);
+ assertArrayEquals(inp, buff);
+ }
+
+ @Test
+ public void test01BitPacking1Bit() throws IOException {
+ runTest(1);
+ }
+
+ @Test
+ public void test02BitPacking2Bit() throws IOException {
+ runTest(2);
+ }
+
+ @Test
+ public void test03BitPacking3Bit() throws IOException {
+ runTest(3);
+ }
+
+ @Test
+ public void test04BitPacking4Bit() throws IOException {
+ runTest(4);
+ }
+
+ @Test
+ public void test05BitPacking5Bit() throws IOException {
+ runTest(5);
+ }
+
+ @Test
+ public void test06BitPacking6Bit() throws IOException {
+ runTest(6);
+ }
+
+ @Test
+ public void test07BitPacking7Bit() throws IOException {
+ runTest(7);
+ }
+
+ @Test
+ public void test08BitPacking8Bit() throws IOException {
+ runTest(8);
+ }
+
+ @Test
+ public void test09BitPacking9Bit() throws IOException {
+ runTest(9);
+ }
+
+ @Test
+ public void test10BitPacking10Bit() throws IOException {
+ runTest(10);
+ }
+
+ @Test
+ public void test11BitPacking11Bit() throws IOException {
+ runTest(11);
+ }
+
+ @Test
+ public void test12BitPacking12Bit() throws IOException {
+ runTest(12);
+ }
+
+ @Test
+ public void test13BitPacking13Bit() throws IOException {
+ runTest(13);
+ }
+
+ @Test
+ public void test14BitPacking14Bit() throws IOException {
+ runTest(14);
+ }
+
+ @Test
+ public void test15BitPacking15Bit() throws IOException {
+ runTest(15);
+ }
+
+ @Test
+ public void test16BitPacking16Bit() throws IOException {
+ runTest(16);
+ }
+
+ @Test
+ public void test17BitPacking17Bit() throws IOException {
+ runTest(17);
+ }
+
+ @Test
+ public void test18BitPacking18Bit() throws IOException {
+ runTest(18);
+ }
+
+ @Test
+ public void test19BitPacking19Bit() throws IOException {
+ runTest(19);
+ }
+
+ @Test
+ public void test20BitPacking20Bit() throws IOException {
+ runTest(20);
+ }
+
+ @Test
+ public void test21BitPacking21Bit() throws IOException {
+ runTest(21);
+ }
+
+ @Test
+ public void test22BitPacking22Bit() throws IOException {
+ runTest(22);
+ }
+
+ @Test
+ public void test23BitPacking23Bit() throws IOException {
+ runTest(23);
+ }
+
+ @Test
+ public void test24BitPacking24Bit() throws IOException {
+ runTest(24);
+ }
+
+ @Test
+ public void test26BitPacking26Bit() throws IOException {
+ runTest(26);
+ }
+
+ @Test
+ public void test28BitPacking28Bit() throws IOException {
+ runTest(28);
+ }
+
+ @Test
+ public void test30BitPacking30Bit() throws IOException {
+ runTest(30);
+ }
+
+ @Test
+ public void test32BitPacking32Bit() throws IOException {
+ runTest(32);
+ }
+
+ @Test
+ public void test40BitPacking40Bit() throws IOException {
+ runTest(40);
+ }
+
+ @Test
+ public void test48BitPacking48Bit() throws IOException {
+ runTest(48);
+ }
+
+ @Test
+ public void test56BitPacking56Bit() throws IOException {
+ runTest(56);
+ }
+
+ @Test
+ public void test64BitPacking64Bit() throws IOException {
+ runTest(64);
+ }
+}
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
index a3cf6e9..c5d4a5f 100644
--- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
@@ -18,15 +18,8 @@
package org.apache.hadoop.hive.ql.io.orc;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import java.io.BufferedReader;
import java.io.File;
@@ -35,8 +28,13 @@
import java.io.PrintStream;
import java.util.Random;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.junit.Before;
+import org.junit.Test;
public class TestFileDump {
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java
new file mode 100644
index 0000000..0fb0fcb
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Test;
+
+public class TestIntegerCompressionReader {
+
+ public void runSeekTest(CompressionCodec codec) throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2(
+ new OutStream("test", 1000, codec, collect), true);
+ TestInStream.PositionCollector[] positions =
+ new TestInStream.PositionCollector[4096];
+ Random random = new Random(99);
+ int[] junk = new int[2048];
+ for(int i=0; i < junk.length; ++i) {
+ junk[i] = random.nextInt();
+ }
+ for(int i=0; i < 4096; ++i) {
+ positions[i] = new TestInStream.PositionCollector();
+ out.getPosition(positions[i]);
+ // test runs, incrementing runs, non-runs
+ if (i < 1024) {
+ out.write(i/4);
+ } else if (i < 2048) {
+ out.write(2*i);
+ } else {
+ out.write(junk[i-2048]);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create
+ ("test", inBuf, codec, 1000), true);
+ for(int i=0; i < 2048; ++i) {
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i/4, x);
+ } else if (i < 2048) {
+ assertEquals(2*i, x);
+ } else {
+ assertEquals(junk[i-2048], x);
+ }
+ }
+ for(int i=2047; i >= 0; --i) {
+ in.seek(positions[i]);
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i/4, x);
+ } else if (i < 2048) {
+ assertEquals(2*i, x);
+ } else {
+ assertEquals(junk[i-2048], x);
+ }
+ }
+ }
+
+ @Test
+ public void testUncompressedSeek() throws Exception {
+ runSeekTest(null);
+ }
+
+ @Test
+ public void testCompressedSeek() throws Exception {
+ runSeekTest(new ZlibCodec());
+ }
+
+ @Test
+ public void testSkips() throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2(
+ new OutStream("test", 100, null, collect), true);
+ for(int i=0; i < 2048; ++i) {
+ if (i < 1024) {
+ out.write(i);
+ } else {
+ out.write(256 * i);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create
+ ("test", inBuf, null, 100), true);
+ for(int i=0; i < 2048; i += 10) {
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i, x);
+ } else {
+ assertEquals(256 * i, x);
+ }
+ if (i < 2038) {
+ in.skip(9);
+ }
+ in.skip(0);
+ }
+ }
+}
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
new file mode 100644
index 0000000..bdc7a15
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
@@ -0,0 +1,793 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+
+public class TestNewIntegerEncoding {
+
+ public static class Row {
+ Integer int1;
+ Long long1;
+
+ public Row(int val, long l) {
+ this.int1 = val;
+ this.long1 = l;
+ }
+ }
+
+ public List fetchData(String path) throws IOException {
+ List input = new ArrayList();
+ FileInputStream stream = new FileInputStream(new File(path));
+ try {
+ FileChannel fc = stream.getChannel();
+ MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size());
+ /* Instead of using default, pass in a decoder. */
+ String[] lines = Charset.defaultCharset().decode(bb).toString()
+ .split("\n");
+ for (String line : lines) {
+ long val = 0;
+ try {
+ val = Long.parseLong(line);
+ } catch (NumberFormatException e) {
+ // for now lets ignore (assign 0)
+ }
+ input.add(val);
+ }
+ } finally {
+ stream.close();
+ }
+ return input;
+ }
+
+ Path workDir = new Path(System.getProperty("test.tmp.dir", "target"
+ + File.separator + "test" + File.separator + "tmp"));
+
+ Configuration conf;
+ FileSystem fs;
+ Path testFilePath;
+ String resDir = "ql/src/test/resources";
+
+ @Rule
+ public TestName testCaseName = new TestName();
+
+ @Before
+ public void openFileSystem() throws Exception {
+ conf = new Configuration();
+ fs = FileSystem.getLocal(conf);
+ testFilePath = new Path(workDir, "TestOrcFile."
+ + testCaseName.getMethodName() + ".orc");
+ fs.delete(testFilePath, false);
+ }
+
+ @Test
+ public void testBasicRow() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Row.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ writer.addRow(new Row(111, 1111L));
+ writer.addRow(new Row(111, 1111L));
+ writer.addRow(new Row(111, 1111L));
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(new IntWritable(111), ((OrcStruct) row).getFieldValue(0));
+ assertEquals(new LongWritable(1111), ((OrcStruct) row).getFieldValue(1));
+ }
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5, 6,
+ 7, 8, 9, 10, 1, 1, 1, 1, 1, 1, 10, 9, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1,
+ 2, 5, 1, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1,
+ 9, 2, 6, 3, 7, 1, 9, 2, 6, 2000, 2, 1, 1, 1, 1, 1, 3, 7, 1, 9, 2, 6, 1,
+ 1, 1, 1, 1 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testBasicDelta1() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { -500, -400, -350, -325, -310 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testBasicDelta2() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { -500, -600, -650, -675, -710 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testBasicDelta3() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 500, 400, 350, 325, 310 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testBasicDelta4() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 500, 600, 650, 675, 710 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testIntegerMin() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add((long) Integer.MIN_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.ZLIB, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testIntegerMax() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add((long) Integer.MAX_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testLongMin() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add(Long.MIN_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testLongMax() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add(Long.MAX_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testRandomInt() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 100000; i++) {
+ input.add((long) rand.nextInt());
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testRandomLong() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 100000; i++) {
+ input.add(rand.nextLong());
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseNegativeMin() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2,
+ 3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, 1, 2, 3, 1, 8, 30, 1, 3, 414, 1,
+ 1, 135, 3, 3, 1, 414, 2, 1, 2, 2, 594, 2, 5, 6, 4, 11, 1, 2, 2, 1, 1,
+ 52, 4, 1, 2, 7, 1, 17, 334, 1, 2, 1, 2, 2, 6, 1, 266, 1, 2, 217, 2, 6,
+ 2, 13, 2, 2, 1, 2, 3, 5, 1, 2, 1, 7244, 11813, 1, 33, 2, -13, 1, 2, 3,
+ 13, 1, 92, 3, 13, 5, 14, 9, 141, 12, 6, 15, 25, 1, 1, 1, 46, 2, 1, 1,
+ 141, 3, 1, 1, 1, 1, 2, 1, 4, 34, 5, 78, 8, 1, 2, 2, 1, 9, 10, 2, 1, 4,
+ 13, 1, 5, 4, 4, 19, 5, 1, 1, 1, 68, 33, 399, 1, 1885, 25, 5, 2, 4, 1,
+ 1, 2, 16, 1, 2966, 3, 1, 1, 25501, 1, 1, 1, 66, 1, 3, 8, 131, 14, 5, 1,
+ 2, 2, 1, 1, 8, 1, 1, 2, 1, 5, 9, 2, 3, 112, 13, 2, 2, 1, 5, 10, 3, 1,
+ 1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2,
+ 2, 16 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseNegativeMin2() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2,
+ 3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, 1, 2, 3, 1, 8, 30, 1, 3, 414, 1,
+ 1, 135, 3, 3, 1, 414, 2, 1, 2, 2, 594, 2, 5, 6, 4, 11, 1, 2, 2, 1, 1,
+ 52, 4, 1, 2, 7, 1, 17, 334, 1, 2, 1, 2, 2, 6, 1, 266, 1, 2, 217, 2, 6,
+ 2, 13, 2, 2, 1, 2, 3, 5, 1, 2, 1, 7244, 11813, 1, 33, 2, -1, 1, 2, 3,
+ 13, 1, 92, 3, 13, 5, 14, 9, 141, 12, 6, 15, 25, 1, 1, 1, 46, 2, 1, 1,
+ 141, 3, 1, 1, 1, 1, 2, 1, 4, 34, 5, 78, 8, 1, 2, 2, 1, 9, 10, 2, 1, 4,
+ 13, 1, 5, 4, 4, 19, 5, 1, 1, 1, 68, 33, 399, 1, 1885, 25, 5, 2, 4, 1,
+ 1, 2, 16, 1, 2966, 3, 1, 1, 25501, 1, 1, 1, 66, 1, 3, 8, 131, 14, 5, 1,
+ 2, 2, 1, 1, 8, 1, 1, 2, 1, 5, 9, 2, 3, 112, 13, 2, 2, 1, 5, 10, 3, 1,
+ 1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2,
+ 2, 16 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseNegativeMin3() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2,
+ 3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, 1, 2, 3, 1, 8, 30, 1, 3, 414, 1,
+ 1, 135, 3, 3, 1, 414, 2, 1, 2, 2, 594, 2, 5, 6, 4, 11, 1, 2, 2, 1, 1,
+ 52, 4, 1, 2, 7, 1, 17, 334, 1, 2, 1, 2, 2, 6, 1, 266, 1, 2, 217, 2, 6,
+ 2, 13, 2, 2, 1, 2, 3, 5, 1, 2, 1, 7244, 11813, 1, 33, 2, 0, 1, 2, 3,
+ 13, 1, 92, 3, 13, 5, 14, 9, 141, 12, 6, 15, 25, 1, 1, 1, 46, 2, 1, 1,
+ 141, 3, 1, 1, 1, 1, 2, 1, 4, 34, 5, 78, 8, 1, 2, 2, 1, 9, 10, 2, 1, 4,
+ 13, 1, 5, 4, 4, 19, 5, 1, 1, 1, 68, 33, 399, 1, 1885, 25, 5, 2, 4, 1,
+ 1, 2, 16, 1, 2966, 3, 1, 1, 25501, 1, 1, 1, 66, 1, 3, 8, 131, 14, 5, 1,
+ 2, 2, 1, 1, 8, 1, 1, 2, 1, 5, 9, 2, 3, 112, 13, 2, 2, 1, 5, 10, 3, 1,
+ 1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2,
+ 2, 16 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseNegativeMin4() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 13, 13, 11, 8, 13, 10, 10, 11, 11, 14, 11, 7, 13,
+ 12, 12, 11, 15, 12, 12, 9, 8, 10, 13, 11, 8, 6, 5, 6, 11, 7, 15, 10, 7,
+ 6, 8, 7, 9, 9, 11, 33, 11, 3, 7, 4, 6, 10, 14, 12, 5, 14, 7, 6 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt0() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(0, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt1() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(1, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt255() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(255, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.ZLIB, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt256() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(256, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.ZLIB, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBase510() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(510, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.ZLIB, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBase511() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(511, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.ZLIB, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testSeek() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 100000; i++) {
+ input.add((long) rand.nextInt());
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 55555;
+ rows.seekToRow(idx);
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+}
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java
index 9f989fd..2f7a7f1 100644
--- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.hive.ql.io.orc;
import static junit.framework.Assert.assertEquals;