diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 2ab4826..8e2e90b 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -502,6 +502,9 @@
// Maximum fraction of heap that can be used by ORC file writers
HIVE_ORC_FILE_MEMORY_POOL("hive.exec.orc.memory.pool", 0.5f), // 50%
+ // use 0.11 version of RLE encoding. if this conf is not defined or any
+ // other value specified, ORC will use the new RLE encoding
+ HIVE_ORC_WRITE_FORMAT("hive.exec.orc.write.format", "0.11"),
HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD("hive.exec.orc.dictionary.key.size.threshold", 0.8f),
diff --git ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
index 19a6d0e..55ac8c6 100644
--- ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
+++ ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
@@ -5695,10 +5695,14 @@ public ColumnEncoding getDefaultInstanceForType() {
implements com.google.protobuf.ProtocolMessageEnum {
DIRECT(0, 0),
DICTIONARY(1, 1),
+ DIRECT_V2(2, 2),
+ DICTIONARY_V2(3, 3),
;
public static final int DIRECT_VALUE = 0;
public static final int DICTIONARY_VALUE = 1;
+ public static final int DIRECT_V2_VALUE = 2;
+ public static final int DICTIONARY_V2_VALUE = 3;
public final int getNumber() { return value; }
@@ -5707,6 +5711,8 @@ public static Kind valueOf(int value) {
switch (value) {
case 0: return DIRECT;
case 1: return DICTIONARY;
+ case 2: return DIRECT_V2;
+ case 3: return DICTIONARY_V2;
default: return null;
}
}
@@ -5737,7 +5743,7 @@ public Kind findValueByNumber(int number) {
}
private static final Kind[] VALUES = {
- DIRECT, DICTIONARY,
+ DIRECT, DICTIONARY, DIRECT_V2, DICTIONARY_V2,
};
public static Kind valueOf(
@@ -11117,42 +11123,42 @@ void setMagic(com.google.protobuf.ByteString value) {
"eam.Kind\022\016\n\006column\030\002 \001(\r\022\016\n\006length\030\003 \001(\004",
"\"r\n\004Kind\022\013\n\007PRESENT\020\000\022\010\n\004DATA\020\001\022\n\n\006LENGT" +
"H\020\002\022\023\n\017DICTIONARY_DATA\020\003\022\024\n\020DICTIONARY_C" +
- "OUNT\020\004\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\221\001\n" +
+ "OUNT\020\004\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\263\001\n" +
"\016ColumnEncoding\022C\n\004kind\030\001 \002(\01625.org.apac" +
"he.hadoop.hive.ql.io.orc.ColumnEncoding." +
- "Kind\022\026\n\016dictionarySize\030\002 \001(\r\"\"\n\004Kind\022\n\n\006" +
- "DIRECT\020\000\022\016\n\nDICTIONARY\020\001\"\214\001\n\014StripeFoote" +
- "r\0229\n\007streams\030\001 \003(\0132(.org.apache.hadoop.h" +
- "ive.ql.io.orc.Stream\022A\n\007columns\030\002 \003(\01320." +
- "org.apache.hadoop.hive.ql.io.orc.ColumnE",
- "ncoding\"\250\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apa" +
- "che.hadoop.hive.ql.io.orc.Type.Kind\022\024\n\010s" +
- "ubtypes\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\272\001" +
- "\n\004Kind\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002" +
- "\022\007\n\003INT\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE" +
- "\020\006\022\n\n\006STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020" +
- "\t\022\010\n\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNIO" +
- "N\020\r\022\013\n\007DECIMAL\020\016\022\010\n\004DATE\020\017\"x\n\021StripeInfo" +
- "rmation\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002" +
- " \001(\004\022\022\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength",
- "\030\004 \001(\004\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMeta" +
- "dataItem\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002" +
- "\n\006Footer\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rconten" +
- "tLength\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apa" +
- "che.hadoop.hive.ql.io.orc.StripeInformat" +
- "ion\0225\n\005types\030\004 \003(\0132&.org.apache.hadoop.h" +
- "ive.ql.io.orc.Type\022D\n\010metadata\030\005 \003(\01322.o" +
- "rg.apache.hadoop.hive.ql.io.orc.UserMeta" +
- "dataItem\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatis" +
- "tics\030\007 \003(\01322.org.apache.hadoop.hive.ql.i",
- "o.orc.ColumnStatistics\022\026\n\016rowIndexStride" +
- "\030\010 \001(\r\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 " +
- "\001(\004\022F\n\013compression\030\002 \001(\01621.org.apache.ha" +
- "doop.hive.ql.io.orc.CompressionKind\022\034\n\024c" +
- "ompressionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003" +
- "(\rB\002\020\001\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKin" +
- "d\022\010\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO" +
- "\020\003"
+ "Kind\022\026\n\016dictionarySize\030\002 \001(\r\"D\n\004Kind\022\n\n\006" +
+ "DIRECT\020\000\022\016\n\nDICTIONARY\020\001\022\r\n\tDIRECT_V2\020\002\022" +
+ "\021\n\rDICTIONARY_V2\020\003\"\214\001\n\014StripeFooter\0229\n\007s" +
+ "treams\030\001 \003(\0132(.org.apache.hadoop.hive.ql" +
+ ".io.orc.Stream\022A\n\007columns\030\002 \003(\01320.org.ap",
+ "ache.hadoop.hive.ql.io.orc.ColumnEncodin" +
+ "g\"\250\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apache.ha" +
+ "doop.hive.ql.io.orc.Type.Kind\022\024\n\010subtype" +
+ "s\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\272\001\n\004Kind" +
+ "\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002\022\007\n\003IN" +
+ "T\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE\020\006\022\n\n\006" +
+ "STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020\t\022\010\n\004L" +
+ "IST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNION\020\r\022\013\n" +
+ "\007DECIMAL\020\016\022\010\n\004DATE\020\017\"x\n\021StripeInformatio" +
+ "n\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002 \001(\004\022\022",
+ "\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength\030\004 \001(\004" +
+ "\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMetadataIt" +
+ "em\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n\006Foot" +
+ "er\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rcontentLengt" +
+ "h\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apache.ha" +
+ "doop.hive.ql.io.orc.StripeInformation\0225\n" +
+ "\005types\030\004 \003(\0132&.org.apache.hadoop.hive.ql" +
+ ".io.orc.Type\022D\n\010metadata\030\005 \003(\01322.org.apa" +
+ "che.hadoop.hive.ql.io.orc.UserMetadataIt" +
+ "em\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatistics\030\007",
+ " \003(\01322.org.apache.hadoop.hive.ql.io.orc." +
+ "ColumnStatistics\022\026\n\016rowIndexStride\030\010 \001(\r" +
+ "\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 \001(\004\022F\n" +
+ "\013compression\030\002 \001(\01621.org.apache.hadoop.h" +
+ "ive.ql.io.orc.CompressionKind\022\034\n\024compres" +
+ "sionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003(\rB\002\020\001" +
+ "\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKind\022\010\n\004N" +
+ "ONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
new file mode 100644
index 0000000..04cfa58
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * Interface for reading integers.
+ */
+interface IntegerReader {
+
+ /**
+ * Seek to the position provided by index.
+ * @param index
+ * @throws IOException
+ */
+ void seek(PositionProvider index) throws IOException;
+
+ /**
+ * Skip number of specified rows.
+ * @param numValues
+ * @throws IOException
+ */
+ void skip(long numValues) throws IOException;
+
+ /**
+ * Check if there are any more values left.
+ * @return
+ * @throws IOException
+ */
+ boolean hasNext() throws IOException;
+
+ /**
+ * Return the next available value.
+ * @return
+ * @throws IOException
+ */
+ long next() throws IOException;
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
new file mode 100644
index 0000000..775d02e
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * Interface for writing integers.
+ */
+interface IntegerWriter {
+
+ /**
+ * Get position from the stream.
+ * @param recorder
+ * @throws IOException
+ */
+ void getPosition(PositionRecorder recorder) throws IOException;
+
+ /**
+ * Write the integer value
+ * @param value
+ * @throws IOException
+ */
+ void write(long value) throws IOException;
+
+ /**
+ * Flush the buffer
+ * @throws IOException
+ */
+ void flush() throws IOException;
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 3038e26..18c5ac8 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -130,6 +131,21 @@ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
}
}
+ IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind,
+ InStream in,
+ boolean signed) throws IOException {
+ switch (kind) {
+ case DIRECT_V2:
+ case DICTIONARY_V2:
+ return new RunLengthIntegerReaderV2(in, signed);
+ case DIRECT:
+ case DICTIONARY:
+ return new RunLengthIntegerReader(in, signed);
+ default:
+ throw new IllegalArgumentException("Unknown encoding " + kind);
+ }
+ }
+
void startStripe(Map streams,
List encoding
) throws IOException {
@@ -266,20 +282,29 @@ void skipRows(long items) throws IOException {
}
private static class ShortTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
ShortTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
}
@Override
@@ -310,20 +335,29 @@ void skipRows(long items) throws IOException {
}
private static class IntTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
IntTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
}
@Override
@@ -354,20 +388,29 @@ void skipRows(long items) throws IOException {
}
private static class LongTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
LongTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
}
@Override
@@ -492,13 +535,22 @@ void skipRows(long items) throws IOException {
private static class BinaryTreeReader extends TreeReader{
private InStream stream;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths = null;
BinaryTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map streams,
List encodings
) throws IOException {
@@ -506,9 +558,8 @@ void startStripe(Map streams,
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
stream = streams.get(name);
- lengths = new RunLengthIntegerReader(streams.get(new
- StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
- false);
+ lengths = createIntegerReader(encodings.get(columnId).getKind(), streams.get(new
+ StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false);
}
@Override
@@ -555,22 +606,33 @@ void skipRows(long items) throws IOException {
}
private static class TimestampTreeReader extends TreeReader{
- private RunLengthIntegerReader data;
- private RunLengthIntegerReader nanos;
+ private IntegerReader data = null;
+ private IntegerReader nanos = null;
TimestampTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
- data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.DATA)), true);
- nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.SECONDARY)), false);
+ data = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)), true);
+ nanos = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.SECONDARY)), false);
}
@Override
@@ -625,20 +687,29 @@ void skipRows(long items) throws IOException {
}
private static class DateTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
DateTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
}
@Override
@@ -670,20 +741,29 @@ void skipRows(long items) throws IOException {
private static class DecimalTreeReader extends TreeReader{
private InStream valueStream;
- private RunLengthIntegerReader scaleStream;
+ private IntegerReader scaleStream = null;
DecimalTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
valueStream = streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.DATA));
- scaleStream = new RunLengthIntegerReader(streams.get(
+ scaleStream = createIntegerReader(encodings.get(columnId).getKind(), streams.get(
new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
}
@@ -726,12 +806,9 @@ void skipRows(long items) throws IOException {
super(path, columnId);
}
+ @Override
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
- if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY &&
- encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
- throw new IOException("Unknown encoding " + encoding + " in column " +
- columnId + " of " + path);
- }
+ reader.checkEncoding(encoding);
}
@Override
@@ -742,9 +819,11 @@ void startStripe(Map streams,
// reader
switch (encodings.get(columnId).getKind()) {
case DIRECT:
+ case DIRECT_V2:
reader = new StringDirectTreeReader(path, columnId);
break;
case DICTIONARY:
+ case DICTIONARY_V2:
reader = new StringDictionaryTreeReader(path, columnId);
break;
default:
@@ -776,7 +855,7 @@ void skipRows(long items) throws IOException {
*/
private static class StringDirectTreeReader extends TreeReader {
private InStream stream;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths;
StringDirectTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -784,7 +863,11 @@ void skipRows(long items) throws IOException {
@Override
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
- // PASS
+ if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT &&
+ encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
}
@Override
@@ -795,8 +878,8 @@ void startStripe(Map streams,
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
stream = streams.get(name);
- lengths = new RunLengthIntegerReader(streams.get(new
- StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+ lengths = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
false);
}
@@ -851,7 +934,7 @@ void skipRows(long items) throws IOException {
private static class StringDictionaryTreeReader extends TreeReader {
private DynamicByteArray dictionaryBuffer;
private int[] dictionaryOffsets;
- private RunLengthIntegerReader reader;
+ private IntegerReader reader;
StringDictionaryTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -859,7 +942,11 @@ void skipRows(long items) throws IOException {
@Override
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
- // PASS
+ if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY &&
+ encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
}
@Override
@@ -884,7 +971,8 @@ void startStripe(Map streams,
// read the lengths
name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
in = streams.get(name);
- RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false);
+ IntegerReader lenReader = createIntegerReader(encodings.get(columnId)
+ .getKind(), in, false);
int offset = 0;
if (dictionaryOffsets == null ||
dictionaryOffsets.length < dictionarySize + 1) {
@@ -899,7 +987,8 @@ void startStripe(Map streams,
// set up the row reader
name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), false);
+ reader = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(name), false);
}
@Override
@@ -1097,7 +1186,7 @@ void skipRows(long items) throws IOException {
private static class ListTreeReader extends TreeReader {
private final TreeReader elementReader;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths = null;
ListTreeReader(Path path, int columnId,
List types,
@@ -1146,12 +1235,22 @@ Object next(Object previous) throws IOException {
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
- lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.LENGTH)), false);
+ lengths = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
if (elementReader != null) {
elementReader.startStripe(streams, encodings);
}
@@ -1171,7 +1270,7 @@ void skipRows(long items) throws IOException {
private static class MapTreeReader extends TreeReader {
private final TreeReader keyReader;
private final TreeReader valueReader;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths = null;
MapTreeReader(Path path,
int columnId,
@@ -1224,12 +1323,22 @@ Object next(Object previous) throws IOException {
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
- lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.LENGTH)), false);
+ lengths = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
if (keyReader != null) {
keyReader.startStripe(streams, encodings);
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
index 2825c64..b737b0f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
@@ -23,7 +23,7 @@
/**
* A reader that reads a sequence of integers.
* */
-class RunLengthIntegerReader {
+class RunLengthIntegerReader implements IntegerReader {
private final InStream input;
private final boolean signed;
private final long[] literals =
@@ -71,11 +71,13 @@ private void readValues() throws IOException {
}
}
- boolean hasNext() throws IOException {
+ @Override
+ public boolean hasNext() throws IOException {
return used != numLiterals || input.available() > 0;
}
- long next() throws IOException {
+ @Override
+ public long next() throws IOException {
long result;
if (used == numLiterals) {
readValues();
@@ -88,7 +90,8 @@ long next() throws IOException {
return result;
}
- void seek(PositionProvider index) throws IOException {
+ @Override
+ public void seek(PositionProvider index) throws IOException {
input.seek(index);
int consumed = (int) index.getNext();
if (consumed != 0) {
@@ -104,7 +107,8 @@ void seek(PositionProvider index) throws IOException {
}
}
- void skip(long numValues) throws IOException {
+ @Override
+ public void skip(long numValues) throws IOException {
while (numValues > 0) {
if (used == numLiterals) {
readValues();
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
new file mode 100644
index 0000000..4a4165f
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
+
+/**
+ * A reader that reads a sequence of light weight compressed integers. Refer
+ * {@link RunLengthIntegerWriterV2} for description of various lightweight
+ * compression techniques.
+ */
+class RunLengthIntegerReaderV2 implements IntegerReader {
+ private final InStream input;
+ private final boolean signed;
+ private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE];
+ private int numLiterals = 0;
+ private int used = 0;
+
+ RunLengthIntegerReaderV2(InStream input, boolean signed) throws IOException {
+ this.input = input;
+ this.signed = signed;
+ }
+
+ private void readValues() throws IOException {
+ // read the first 2 bits and determine the encoding type
+ int firstByte = input.read();
+ if (firstByte < 0) {
+ throw new EOFException("Read past end of RLE integer from " + input);
+ } else {
+ int enc = (firstByte >>> 6) & 0x03;
+ if (EncodingType.SHORT_REPEAT.ordinal() == enc) {
+ readShortRepeatValues(firstByte);
+ } else if (EncodingType.DIRECT.ordinal() == enc) {
+ readDirectValues(firstByte);
+ } else if (EncodingType.PATCHED_BASE.ordinal() == enc) {
+ readPatchedBaseValues(firstByte);
+ } else {
+ readDeltaValues(firstByte);
+ }
+ }
+ }
+
+ private void readDeltaValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fb = (firstByte >>> 1) & 0x1f;
+ if (fb != 0) {
+ fb = SerializationUtils.decodeBitWidth(fb);
+ }
+
+ // extract the blob run length
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+
+ // read the first value stored as vint
+ long firstVal = 0;
+ if (signed) {
+ firstVal = SerializationUtils.readVslong(input);
+ } else {
+ firstVal = SerializationUtils.readVulong(input);
+ }
+
+ // store first value to result buffer
+ long prevVal = firstVal;
+ literals[numLiterals++] = firstVal;
+
+ // if fixed bits is 0 then all values have fixed delta
+ if (fb == 0) {
+ // read the fixed delta value stored as vint (deltas can be negative even
+ // if all number are positive)
+ long fd = SerializationUtils.readVslong(input);
+
+ // add fixed deltas to adjacent values
+ for(int i = 0; i < len; i++) {
+ literals[numLiterals++] = literals[numLiterals - 2] + fd;
+ }
+ } else {
+ long deltaBase = SerializationUtils.readVslong(input);
+ // add delta base and first value
+ literals[numLiterals++] = firstVal + deltaBase;
+ prevVal = literals[numLiterals - 1];
+ len -= 1;
+
+ // write the unpacked values, add it to previous value and store final
+ // value to result buffer. if the delta base value is negative then it
+ // is a decreasing sequence else an increasing sequence
+ SerializationUtils.readInts(literals, numLiterals, len, fb, input);
+ while (len > 0) {
+ if (deltaBase < 0) {
+ literals[numLiterals] = prevVal - literals[numLiterals];
+ } else {
+ literals[numLiterals] = prevVal + literals[numLiterals];
+ }
+ prevVal = literals[numLiterals];
+ len--;
+ numLiterals++;
+ }
+ }
+ }
+
+ private void readPatchedBaseValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fbo = (firstByte >>> 1) & 0x1f;
+ int fb = SerializationUtils.decodeBitWidth(fbo);
+
+ // extract the run length of data blob
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+ // runs are always one off
+ len += 1;
+
+ // extract the number of bytes occupied by base
+ int thirdByte = input.read();
+ int bw = (thirdByte >>> 5) & 0x07;
+ // base width is one off
+ bw += 1;
+
+ // extract patch width
+ int pwo = thirdByte & 0x1f;
+ int pw = SerializationUtils.decodeBitWidth(pwo);
+
+ // read fourth byte and extract patch gap width
+ int fourthByte = input.read();
+ int pgw = (fourthByte >>> 5) & 0x07;
+ // patch gap width is one off
+ pgw += 1;
+
+ // extract the length of the patch list
+ int pl = fourthByte & 0x1f;
+
+ // read the next base width number of bytes to extract base value
+ long base = SerializationUtils.bytesToLongBE(input, bw);
+ long mask = (1L << ((bw * 8) - 1));
+ // if MSB of base value is 1 then base is negative value else positive
+ if ((base & mask) != 0) {
+ base = base & ~mask;
+ base = -base;
+ }
+
+ // unpack the data blob
+ long[] unpacked = new long[len];
+ SerializationUtils.readInts(unpacked, 0, len, fb, input);
+
+ // unpack the patch blob
+ long[] unpackedPatch = new long[pl];
+ SerializationUtils.readInts(unpackedPatch, 0, pl, pw + pgw, input);
+
+ // apply the patch directly when decoding the packed data
+ int patchIdx = 0;
+ long currGap = 0;
+ long currPatch = 0;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ long actualGap = 0;
+
+ // special case: gap is >255 then patch value will be 0.
+ // if gap is <=255 then patch value cannot be 0
+ while (currGap == 255 && currPatch == 0) {
+ actualGap += 255;
+ patchIdx++;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ }
+ // add the left over gap
+ actualGap += currGap;
+
+ // unpack data blob, patch it (if required), add base to get final result
+ for(int i = 0; i < unpacked.length; i++) {
+ if (i == actualGap) {
+ // extract the patch value
+ long patchedVal = unpacked[i] | (currPatch << fb);
+
+ // add base to patched value
+ literals[numLiterals++] = base + patchedVal;
+
+ // increment the patch to point to next entry in patch list
+ patchIdx++;
+
+ if (patchIdx < pl) {
+ // read the next gap and patch
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ actualGap = 0;
+
+ // special case: gap is >255 then patch will be 0. if gap is
+ // <=255 then patch cannot be 0
+ while (currGap == 255 && currPatch == 0) {
+ actualGap += 255;
+ patchIdx++;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ }
+ // add the left over gap
+ actualGap += currGap;
+
+ // next gap is relative to the current gap
+ actualGap += i;
+ }
+ } else {
+ // no patching required. add base to unpacked value to get final value
+ literals[numLiterals++] = base + unpacked[i];
+ }
+ }
+
+ }
+
+ private void readDirectValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fbo = (firstByte >>> 1) & 0x1f;
+ int fb = SerializationUtils.decodeBitWidth(fbo);
+
+ // extract the run length
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+ // runs are one off
+ len += 1;
+
+ // write the unpacked values and zigzag decode to result buffer
+ SerializationUtils.readInts(literals, numLiterals, len, fb, input);
+ if (signed) {
+ for(int i = 0; i < len; i++) {
+ literals[numLiterals] = SerializationUtils
+ .zigzagDecode(literals[numLiterals]);
+ numLiterals++;
+ }
+ } else {
+ numLiterals += len;
+ }
+ }
+
+ private void readShortRepeatValues(int firstByte) throws IOException {
+
+ // read the number of bytes occupied by the value
+ int size = (firstByte >>> 3) & 0x07;
+ // #bytes are one off
+ size += 1;
+
+ // read the run length
+ int len = firstByte & 0x07;
+ // run lengths values are stored only after MIN_REPEAT value is met
+ len += RunLengthIntegerWriterV2.MIN_REPEAT;
+
+ // read the repeated value which is store using fixed bytes
+ long val = SerializationUtils.bytesToLongBE(input, size);
+
+ if (signed) {
+ val = SerializationUtils.zigzagDecode(val);
+ }
+
+ // repeat the value for length times
+ for(int i = 0; i < len; i++) {
+ literals[numLiterals++] = val;
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return used != numLiterals || input.available() > 0;
+ }
+
+ @Override
+ public long next() throws IOException {
+ long result;
+ if (used == numLiterals) {
+ numLiterals = 0;
+ used = 0;
+ readValues();
+ }
+ result = literals[used++];
+ return result;
+ }
+
+ @Override
+ public void seek(PositionProvider index) throws IOException {
+ input.seek(index);
+ int consumed = (int) index.getNext();
+ if (consumed != 0) {
+ // a loop is required for cases where we break the run into two
+ // parts
+ while (consumed > 0) {
+ numLiterals = 0;
+ readValues();
+ used = consumed;
+ consumed -= numLiterals;
+ }
+ } else {
+ used = 0;
+ numLiterals = 0;
+ }
+ }
+
+ @Override
+ public void skip(long numValues) throws IOException {
+ while (numValues > 0) {
+ if (used == numLiterals) {
+ numLiterals = 0;
+ used = 0;
+ readValues();
+ }
+ long consume = Math.min(numValues, numLiterals - used);
+ used += consume;
+ numValues -= consume;
+ }
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
index aaca0a1..539f8df 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
@@ -25,7 +25,7 @@
* repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128
* literal vint values follow.
*/
-class RunLengthIntegerWriter {
+class RunLengthIntegerWriter implements IntegerWriter {
static final int MIN_REPEAT_SIZE = 3;
static final int MAX_DELTA = 127;
static final int MIN_DELTA = -128;
@@ -71,12 +71,14 @@ private void writeValues() throws IOException {
}
}
- void flush() throws IOException {
+ @Override
+ public void flush() throws IOException {
writeValues();
output.flush();
}
- void write(long value) throws IOException {
+ @Override
+ public void write(long value) throws IOException {
if (numLiterals == 0) {
literals[numLiterals++] = value;
tailRunLength = 1;
@@ -130,8 +132,10 @@ void write(long value) throws IOException {
}
}
- void getPosition(PositionRecorder recorder) throws IOException {
+ @Override
+ public void getPosition(PositionRecorder recorder) throws IOException {
output.getPosition(recorder);
recorder.addPosition(numLiterals);
}
+
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
new file mode 100644
index 0000000..a4497b3
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
@@ -0,0 +1,817 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * A writer that performs light weight compression over sequence of integers.
+ *
+ * There are four types of lightweight integer compression
+ *
+ * - SHORT_REPEAT
+ * - DIRECT
+ * - PATCHED_BASE
+ * - DELTA
+ *
+ *
+ * The description and format for these types are as below:
+ *
+ * SHORT_REPEAT: Used for short repeated integer sequences.
+ *
+ * - 1 byte header
+ *
+ * - 2 bits for encoding type
+ * - 3 bits for bytes required for repeating value
+ * - 3 bits for repeat count (MIN_REPEAT + run length)
+ *
+ *
+ * - Blob - repeat value (fixed bytes)
+ *
+ *
+ *
+ * DIRECT: Used for random integer sequences whose number of bit
+ * requirement doesn't vary a lot.
+ *
+ * - 2 bytes header
+ *
+ * 1st byte
+ * - 2 bits for encoding type
+ * - 5 bits for fixed bit width of values in blob
+ * - 1 bit for storing MSB of run length
+ *
+ *
+ * 2nd byte
+ * - 8 bits for lower run length bits
+ *
+ *
+ * - Blob - stores the direct values using fixed bit width. The length of the
+ * data blob is (fixed width * run length) bits long
+ *
+ *
+ *
+ * PATCHED_BASE: Used for random integer sequences whose number of bit
+ * requirement varies beyond a threshold.
+ *
+ * - 4 bytes header
+ *
+ * 1st byte
+ * - 2 bits for encoding type
+ * - 5 bits for fixed bit width of values in blob
+ * - 1 bit for storing MSB of run length
+ *
+ *
+ * 2nd byte
+ * - 8 bits for lower run length bits
+ *
+ *
+ * 3rd byte
+ * - 3 bits for bytes required to encode base value
+ * - 5 bits for patch width
+ *
+ *
+ * 4th byte
+ * - 3 bits for patch gap width
+ * - 5 bits for patch length
+ *
+ *
+ * - Base value - Stored using fixed number of bytes. If MSB is set, base
+ * value is negative else positive. Length of base value is (base width * 8)
+ * bits.
+ * - Data blob - Base reduced values as stored using fixed bit width. Length
+ * of data blob is (fixed width * run length) bits.
+ * - Patch blob - Patch blob is a list of gap and patch value. Each entry in
+ * the patch list is (patch width + patch gap width) bits long. Gap between the
+ * subsequent elements to be patched are stored in upper part of entry whereas
+ * patch values are stored in lower part of entry. Length of patch blob is
+ * ((patch width + patch gap width) * patch length) bits.
+ *
+ *
+ *
+ * DELTA Used for monotonically increasing or decreasing sequences,
+ * sequences with fixed delta values or long repeated sequences.
+ *
+ * - 2 bytes header
+ *
+ * 1st byte
+ * - 2 bits for encoding type
+ * - 5 bits for fixed bit width of values in blob
+ * - 1 bit for storing MSB of run length
+ *
+ *
+ * 2nd byte
+ * - 8 bits for lower run length bits
+ *
+ *
+ * - Base value - encoded as varint
+ * - Delta base - encoded as varint
+ * - Delta blob - only positive values. monotonicity and orderness are decided
+ * based on the sign of the base value and delta base
+ *
+ *
+ */
+class RunLengthIntegerWriterV2 implements IntegerWriter {
+
+ public enum EncodingType {
+ SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA
+ }
+
+ static final int MAX_SCOPE = 512;
+ static final int MIN_REPEAT = 3;
+ private static final int MAX_SHORT_REPEAT_LENGTH = 10;
+ private long prevDelta = 0;
+ private int fixedRunLength = 0;
+ private int variableRunLength = 0;
+ private final long[] literals = new long[MAX_SCOPE];
+ private final PositionedOutputStream output;
+ private final boolean signed;
+ private EncodingType encoding;
+ private int numLiterals;
+ private long[] zigzagLiterals;
+ private long[] baseRedLiterals;
+ private long[] adjDeltas;
+ private long fixedDelta;
+ private int zzBits90p;
+ private int zzBits100p;
+ private int brBits95p;
+ private int brBits100p;
+ private int bitsDeltaMax;
+ private int patchWidth;
+ private int patchGapWidth;
+ private int patchLength;
+ private long[] gapVsPatchList;
+ private long min;
+ private boolean isFixedDelta;
+
+ RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) {
+ this.output = output;
+ this.signed = signed;
+ clear();
+ }
+
+ private void writeValues() throws IOException {
+ if (numLiterals != 0) {
+
+ if (encoding.equals(EncodingType.SHORT_REPEAT)) {
+ writeShortRepeatValues();
+ } else if (encoding.equals(EncodingType.DIRECT)) {
+ writeDirectValues();
+ } else if (encoding.equals(EncodingType.PATCHED_BASE)) {
+ writePatchedBaseValues();
+ } else {
+ writeDeltaValues();
+ }
+
+ // clear all the variables
+ clear();
+ }
+ }
+
+ private void writeDeltaValues() throws IOException {
+ int len = 0;
+ int fb = bitsDeltaMax;
+ int efb = 0;
+
+ if (isFixedDelta) {
+ // if fixed run length is greater than threshold then it will be fixed
+ // delta sequence with delta value 0 else fixed delta sequence with
+ // non-zero delta value
+ if (fixedRunLength > MIN_REPEAT) {
+ // ex. sequence: 2 2 2 2 2 2 2 2
+ len = fixedRunLength - 1;
+ fixedRunLength = 0;
+ } else {
+ // ex. sequence: 4 6 8 10 12 14 16
+ len = variableRunLength - 1;
+ variableRunLength = 0;
+ }
+ } else {
+ // fixed width 0 is used for long repeating values.
+ // sequences that require only 1 bit to encode will have an additional bit
+ if (fb == 1) {
+ fb = 2;
+ }
+ efb = SerializationUtils.encodeBitWidth(fb);
+ efb = efb << 1;
+ len = variableRunLength - 1;
+ variableRunLength = 0;
+ }
+
+ // extract the 9th bit of run length
+ int tailBits = (len & 0x100) >>> 8;
+
+ // create first byte of the header
+ int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ int headerSecondByte = len & 0xff;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+
+ // store the first value from zigzag literal array
+ if (signed) {
+ SerializationUtils.writeVslong(output, literals[0]);
+ } else {
+ SerializationUtils.writeVulong(output, literals[0]);
+ }
+
+ if (isFixedDelta) {
+ // if delta is fixed then we don't need to store delta blob
+ SerializationUtils.writeVslong(output, fixedDelta);
+ } else {
+ // store the first value as delta value using zigzag encoding
+ SerializationUtils.writeVslong(output, adjDeltas[0]);
+ // adjacent delta values are bit packed
+ SerializationUtils.writeInts(adjDeltas, 1, adjDeltas.length - 1, fb,
+ output);
+ }
+ }
+
+ private void writePatchedBaseValues() throws IOException {
+
+ // write the number of fixed bits required in next 5 bits
+ int fb = brBits95p;
+ int efb = SerializationUtils.encodeBitWidth(fb) << 1;
+
+ // adjust variable run length, they are one off
+ variableRunLength -= 1;
+
+ // extract the 9th bit of run length
+ int tailBits = (variableRunLength & 0x100) >>> 8;
+
+ // create first byte of the header
+ int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ int headerSecondByte = variableRunLength & 0xff;
+
+ // if the min value is negative toggle the sign
+ boolean isNegative = min < 0 ? true : false;
+ if (isNegative) {
+ min = -min;
+ }
+
+ // find the number of bytes required for base and shift it by 5 bits
+ // to accommodate patch width. The additional bit is used to store the sign
+ // of the base value.
+ int baseWidth = SerializationUtils.findClosestNumBits(min) + 1;
+ int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1;
+ int bb = (baseBytes - 1) << 5;
+
+ // if the base value is negative then set MSB to 1
+ if (isNegative) {
+ min |= (1L << ((baseBytes * 8) - 1));
+ }
+
+ // third byte contains 3 bits for number of bytes occupied by base
+ // and 5 bits for patchWidth
+ int headerThirdByte = bb | SerializationUtils.encodeBitWidth(patchWidth);
+
+ // fourth byte contains 3 bits for page gap width and 5 bits for
+ // patch length
+ int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+ output.write(headerThirdByte);
+ output.write(headerFourthByte);
+
+ // write the base value using fixed bytes in big endian order
+ for(int i = baseBytes - 1; i >= 0; i--) {
+ byte b = (byte) ((min >>> (i * 8)) & 0xff);
+ output.write(b);
+ }
+
+ // base reduced literals are bit packed
+ int closestFixedBits = SerializationUtils.getClosestFixedBits(brBits95p);
+ SerializationUtils.writeInts(baseRedLiterals, 0, baseRedLiterals.length,
+ closestFixedBits, output);
+
+ // write patch list
+ closestFixedBits = SerializationUtils.getClosestFixedBits(patchGapWidth
+ + patchWidth);
+ SerializationUtils.writeInts(gapVsPatchList, 0, gapVsPatchList.length,
+ closestFixedBits, output);
+
+ // reset run length
+ variableRunLength = 0;
+ }
+
+ /**
+ * Store the opcode in 2 MSB bits
+ * @return opcode
+ */
+ private int getOpcode() {
+ return encoding.ordinal() << 6;
+ }
+
+ private void writeDirectValues() throws IOException {
+
+ // write the number of fixed bits required in next 5 bits
+ int efb = SerializationUtils.encodeBitWidth(zzBits100p) << 1;
+
+ // adjust variable run length
+ variableRunLength -= 1;
+
+ // extract the 9th bit of run length
+ int tailBits = (variableRunLength & 0x100) >>> 8;
+
+ // create first byte of the header
+ int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ int headerSecondByte = variableRunLength & 0xff;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+
+ // bit packing the zigzag encoded literals
+ SerializationUtils.writeInts(zigzagLiterals, 0, zigzagLiterals.length,
+ zzBits100p, output);
+
+ // reset run length
+ variableRunLength = 0;
+ }
+
+ private void writeShortRepeatValues() throws IOException {
+ // get the value that is repeating, compute the bits and bytes required
+ long repeatVal = 0;
+ if (signed) {
+ repeatVal = SerializationUtils.zigzagEncode(literals[0]);
+ } else {
+ repeatVal = literals[0];
+ }
+
+ int numBitsRepeatVal = SerializationUtils.findClosestNumBits(repeatVal);
+ int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3
+ : (numBitsRepeatVal >>> 3) + 1;
+
+ // write encoding type in top 2 bits
+ int header = getOpcode();
+
+ // write the number of bytes required for the value
+ header |= ((numBytesRepeatVal - 1) << 3);
+
+ // write the run length
+ fixedRunLength -= MIN_REPEAT;
+ header |= fixedRunLength;
+
+ // write the header
+ output.write(header);
+
+ // write the repeating value in big endian byte order
+ for(int i = numBytesRepeatVal - 1; i >= 0; i--) {
+ int b = (int) ((repeatVal >>> (i * 8)) & 0xff);
+ output.write(b);
+ }
+
+ fixedRunLength = 0;
+ }
+
+ private void determineEncoding() {
+ // used for direct encoding
+ zigzagLiterals = new long[numLiterals];
+
+ // used for patched base encoding
+ baseRedLiterals = new long[numLiterals];
+
+ // used for delta encoding
+ adjDeltas = new long[numLiterals - 1];
+
+ int idx = 0;
+
+ // for identifying monotonic sequences
+ boolean isIncreasing = false;
+ int increasingCount = 1;
+ boolean isDecreasing = false;
+ int decreasingCount = 1;
+
+ // for identifying type of delta encoding
+ min = literals[0];
+ long max = literals[0];
+ isFixedDelta = true;
+ long currDelta = 0;
+
+ min = literals[0];
+ long deltaMax = 0;
+
+ // populate all variables to identify the encoding type
+ if (numLiterals >= 1) {
+ currDelta = literals[1] - literals[0];
+ for(int i = 0; i < numLiterals; i++) {
+ if (i > 0 && literals[i] >= max) {
+ max = literals[i];
+ increasingCount++;
+ }
+
+ if (i > 0 && literals[i] <= min) {
+ min = literals[i];
+ decreasingCount++;
+ }
+
+ // if delta doesn't changes then mark it as fixed delta
+ if (i > 0 && isFixedDelta) {
+ if (literals[i] - literals[i - 1] != currDelta) {
+ isFixedDelta = false;
+ }
+
+ fixedDelta = currDelta;
+ }
+
+ // populate zigzag encoded literals
+ long zzEncVal = 0;
+ if (signed) {
+ zzEncVal = SerializationUtils.zigzagEncode(literals[i]);
+ } else {
+ zzEncVal = literals[i];
+ }
+ zigzagLiterals[idx] = zzEncVal;
+ idx++;
+
+ // max delta value is required for computing the fixed bits
+ // required for delta blob in delta encoding
+ if (i > 0) {
+ if (i == 1) {
+ // first value preserve the sign
+ adjDeltas[i - 1] = literals[i] - literals[i - 1];
+ } else {
+ adjDeltas[i - 1] = Math.abs(literals[i] - literals[i - 1]);
+ if (adjDeltas[i - 1] > deltaMax) {
+ deltaMax = adjDeltas[i - 1];
+ }
+ }
+ }
+ }
+
+ // stores the number of bits required for packing delta blob in
+ // delta encoding
+ bitsDeltaMax = SerializationUtils.findClosestNumBits(deltaMax);
+
+ // if decreasing count equals total number of literals then the
+ // sequence is monotonically decreasing
+ if (increasingCount == 1 && decreasingCount == numLiterals) {
+ isDecreasing = true;
+ }
+
+ // if increasing count equals total number of literals then the
+ // sequence is monotonically increasing
+ if (decreasingCount == 1 && increasingCount == numLiterals) {
+ isIncreasing = true;
+ }
+ }
+
+ // if the sequence is both increasing and decreasing then it is not
+ // monotonic
+ if (isDecreasing && isIncreasing) {
+ isDecreasing = false;
+ isIncreasing = false;
+ }
+
+ // fixed delta condition
+ if (isIncreasing == false && isDecreasing == false && isFixedDelta == true) {
+ encoding = EncodingType.DELTA;
+ return;
+ }
+
+ // monotonic condition
+ if (isIncreasing || isDecreasing) {
+ encoding = EncodingType.DELTA;
+ return;
+ }
+
+ // percentile values are computed for the zigzag encoded values. if the
+ // number of bit requirement between 90th and 100th percentile varies
+ // beyond a threshold then we need to patch the values. if the variation
+ // is not significant then we can use direct or delta encoding
+
+ double p = 0.9;
+ zzBits90p = SerializationUtils.percentileBits(zigzagLiterals, p);
+
+ p = 1.0;
+ zzBits100p = SerializationUtils.percentileBits(zigzagLiterals, p);
+
+ int diffBitsLH = zzBits100p - zzBits90p;
+
+ // if the difference between 90th percentile and 100th percentile fixed
+ // bits is > 1 then we need patch the values
+ if (isIncreasing == false && isDecreasing == false && diffBitsLH > 1
+ && isFixedDelta == false) {
+ // patching is done only on base reduced values.
+ // remove base from literals
+ for(int i = 0; i < zigzagLiterals.length; i++) {
+ baseRedLiterals[i] = literals[i] - min;
+ }
+
+ // 95th percentile width is used to determine max allowed value
+ // after which patching will be done
+ p = 0.95;
+ brBits95p = SerializationUtils.percentileBits(baseRedLiterals, p);
+
+ // 100th percentile is used to compute the max patch width
+ p = 1.0;
+ brBits100p = SerializationUtils.percentileBits(baseRedLiterals, p);
+
+ // after base reducing the values, if the difference in bits between
+ // 95th percentile and 100th percentile value is zero then there
+ // is no point in patching the values, in which case we will
+ // fallback to DIRECT encoding.
+ // The decision to use patched base was based on zigzag values, but the
+ // actual patching is done on base reduced literals.
+ if ((brBits100p - brBits95p) != 0) {
+ encoding = EncodingType.PATCHED_BASE;
+ preparePatchedBlob();
+ return;
+ } else {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+ }
+
+ // if difference in bits between 95th percentile and 100th percentile is
+ // 0, then patch length will become 0. Hence we will fallback to direct
+ if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1
+ && isFixedDelta == false) {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+
+ // this should not happen
+ if (encoding == null) {
+ throw new RuntimeException("Integer encoding cannot be determined.");
+ }
+ }
+
+ private void preparePatchedBlob() {
+ // mask will be max value beyond which patch will be generated
+ int mask = (1 << brBits95p) - 1;
+
+ // since we are considering only 95 percentile, the size of gap and
+ // patch array can contain only be 5% values
+ patchLength = (int) Math.ceil((baseRedLiterals.length * 0.05));
+ int[] gapList = new int[patchLength];
+ long[] patchList = new long[patchLength];
+
+ // #bit for patch
+ patchWidth = brBits100p - brBits95p;
+ patchWidth = SerializationUtils.getClosestFixedBits(patchWidth);
+
+ int gapIdx = 0;
+ int patchIdx = 0;
+ int prev = 0;
+ int gap = 0;
+ int maxGap = 0;
+
+ for(int i = 0; i < baseRedLiterals.length; i++) {
+ // if value is above mask then create the patch and record the gap
+ if (baseRedLiterals[i] > mask) {
+ gap = i - prev;
+ if (gap > maxGap) {
+ maxGap = gap;
+ }
+
+ // gaps are relative, so store the previous patched value index
+ prev = i;
+ gapList[gapIdx++] = gap;
+
+ // extract the most significant bits that are over mask bits
+ long patch = baseRedLiterals[i] >>> brBits95p;
+ patchList[patchIdx++] = patch;
+
+ // strip off the MSB to enable safe bit packing
+ baseRedLiterals[i] &= mask;
+ }
+ }
+
+ // adjust the patch length to number of entries in gap list
+ patchLength = gapIdx;
+
+ // if the element to be patched is the first and only element then
+ // max gap will be 0, but to store the gap as 0 we need atleast 1 bit
+ if (maxGap == 0 && patchLength != 0) {
+ patchGapWidth = 1;
+ } else {
+ patchGapWidth = SerializationUtils.findClosestNumBits(maxGap);
+ }
+
+ // special case: if the patch gap width is greater than 256, then
+ // we need 9 bits to encode the gap width. But we only have 3 bits in
+ // header to record the gap width. To deal with this case, we will save
+ // two entries in patch list in the following way
+ // 256 gap width => 0 for patch value
+ // actual gap - 256 => actual patch value
+ // We will do the same for gap width = 511. If the element to be patched is
+ // the last element in the scope then gap width will be 511. In this case we
+ // will have 3 entries in the patch list in the following way
+ // 255 gap width => 0 for patch value
+ // 255 gap width => 0 for patch value
+ // 1 gap width => actual patch value
+ if (patchGapWidth > 8) {
+ patchGapWidth = 8;
+ // for gap = 511, we need two additional entries in patch list
+ if (maxGap == 511) {
+ patchLength += 2;
+ } else {
+ patchLength += 1;
+ }
+ }
+
+ // create gap vs patch list
+ gapIdx = 0;
+ patchIdx = 0;
+ gapVsPatchList = new long[patchLength];
+ for(int i = 0; i < patchLength; i++) {
+ long g = gapList[gapIdx++];
+ long p = patchList[patchIdx++];
+ while (g > 255) {
+ gapVsPatchList[i++] = (255 << patchWidth) | 0;
+ g -= 255;
+ }
+
+ // store patch value in LSBs and gap in MSBs
+ gapVsPatchList[i] = (g << patchWidth) | p;
+ }
+ }
+
+ /**
+ * clears all the variables
+ */
+ private void clear() {
+ numLiterals = 0;
+ encoding = null;
+ prevDelta = 0;
+ zigzagLiterals = null;
+ baseRedLiterals = null;
+ adjDeltas = null;
+ fixedDelta = 0;
+ zzBits90p = 0;
+ zzBits100p = 0;
+ brBits95p = 0;
+ brBits100p = 0;
+ bitsDeltaMax = 0;
+ patchGapWidth = 0;
+ patchLength = 0;
+ patchWidth = 0;
+ gapVsPatchList = null;
+ min = 0;
+ isFixedDelta = false;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (numLiterals != 0) {
+ if (variableRunLength != 0) {
+ determineEncoding();
+ writeValues();
+ } else if (fixedRunLength != 0) {
+ if (fixedRunLength < MIN_REPEAT) {
+ variableRunLength = fixedRunLength;
+ fixedRunLength = 0;
+ determineEncoding();
+ writeValues();
+ } else if (fixedRunLength >= MIN_REPEAT
+ && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+ encoding = EncodingType.SHORT_REPEAT;
+ writeValues();
+ } else {
+ encoding = EncodingType.DELTA;
+ isFixedDelta = true;
+ writeValues();
+ }
+ }
+ }
+ output.flush();
+ }
+
+ @Override
+ public void write(long val) throws IOException {
+ if (numLiterals == 0) {
+ initializeLiterals(val);
+ } else {
+ if (numLiterals == 1) {
+ prevDelta = val - literals[0];
+ literals[numLiterals++] = val;
+ // if both values are same count as fixed run else variable run
+ if (val == literals[0]) {
+ fixedRunLength = 2;
+ variableRunLength = 0;
+ } else {
+ fixedRunLength = 0;
+ variableRunLength = 2;
+ }
+ } else {
+ long currentDelta = val - literals[numLiterals - 1];
+ if (prevDelta == 0 && currentDelta == 0) {
+ // fixed delta run
+
+ literals[numLiterals++] = val;
+
+ // if variable run is non-zero then we are seeing repeating
+ // values at the end of variable run in which case keep
+ // updating variable and fixed runs
+ if (variableRunLength > 0) {
+ fixedRunLength = 2;
+ }
+ fixedRunLength += 1;
+
+ // if fixed run met the minimum condition and if variable
+ // run is non-zero then flush the variable run and shift the
+ // tail fixed runs to start of the buffer
+ if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) {
+ numLiterals -= MIN_REPEAT;
+ variableRunLength -= MIN_REPEAT - 1;
+ // copy the tail fixed runs
+ long[] tailVals = new long[MIN_REPEAT];
+ System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT);
+
+ // determine variable encoding and flush values
+ determineEncoding();
+ writeValues();
+
+ // shift tail fixed runs to beginning of the buffer
+ for(long l : tailVals) {
+ literals[numLiterals++] = l;
+ }
+ }
+
+ // if fixed runs reached max repeat length then write values
+ if (fixedRunLength == MAX_SCOPE) {
+ determineEncoding();
+ writeValues();
+ }
+ } else {
+ // variable delta run
+
+ // if fixed run length is non-zero and if it satisfies the
+ // short repeat conditions then write the values as short repeats
+ // else use delta encoding
+ if (fixedRunLength >= MIN_REPEAT) {
+ if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+ encoding = EncodingType.SHORT_REPEAT;
+ writeValues();
+ } else {
+ encoding = EncodingType.DELTA;
+ isFixedDelta = true;
+ writeValues();
+ }
+ }
+
+ // if fixed run length is 0 && fixedRunLength < MIN_REPEAT) {
+ if (val != literals[numLiterals - 1]) {
+ variableRunLength = fixedRunLength;
+ fixedRunLength = 0;
+ }
+ }
+
+ // after writing values re-initialize the variables
+ if (numLiterals == 0) {
+ initializeLiterals(val);
+ } else {
+ // keep updating variable run lengths
+ prevDelta = val - literals[numLiterals - 1];
+ literals[numLiterals++] = val;
+ variableRunLength += 1;
+
+ // if variable run length reach the max scope, write it
+ if (variableRunLength == MAX_SCOPE) {
+ determineEncoding();
+ writeValues();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void initializeLiterals(long val) {
+ literals[numLiterals++] = val;
+ fixedRunLength = 1;
+ variableRunLength = 1;
+ }
+
+ @Override
+ public void getPosition(PositionRecorder recorder) throws IOException {
+ output.getPosition(recorder);
+ recorder.addPosition(numLiterals);
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
index 67762b5..1cd84f9 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
@@ -185,4 +185,279 @@ static BigInteger readBigInteger(InputStream input) throws IOException {
result = result.shiftRight(1);
return result;
}
+
+ enum FixedBitSizes {
+ ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE,
+ THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN,
+ TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX,
+ TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR;
+ }
+
+ /**
+ * Count the number of bits required to encode the given value
+ * @param value
+ * @return bits required to store value
+ */
+ static int findClosestNumBits(long value) {
+ int count = 0;
+ while (value > 0) {
+ count++;
+ value = value >>> 1;
+ }
+ return getClosestFixedBits(count);
+ }
+
+ /**
+ * zigzag encode the given value
+ * @param val
+ * @return zigzag encoded value
+ */
+ static long zigzagEncode(long val) {
+ return (val << 1) ^ (val >> 63);
+ }
+
+ /**
+ * zigzag decode the given value
+ * @param val
+ * @return zizag decoded value
+ */
+ static long zigzagDecode(long val) {
+ return (val >>> 1) ^ -(val & 1);
+ }
+
+ /**
+ * Compute the bits required to represent pth percentile value
+ * @param data - array
+ * @param p - percentile value (>=0.0 to <=1.0)
+ * @return pth percentile bits
+ */
+ static int percentileBits(long[] data, double p) {
+ if ((p > 1.0) || (p <= 0.0)) {
+ return -1;
+ }
+
+ // histogram that store the encoded bit requirement for each values.
+ // maximum number of bits that can encoded is 32 (refer FixedBitSizes)
+ int[] hist = new int[32];
+
+ // compute the histogram
+ for(long l : data) {
+ int idx = encodeBitWidth(findClosestNumBits(l));
+ hist[idx] += 1;
+ }
+
+ int len = data.length;
+ int perLen = (int) (len * (1.0 - p));
+
+ // return the bits required by pth percentile length
+ for(int i = hist.length - 1; i >= 0; i--) {
+ perLen -= hist[i];
+ if (perLen < 0) {
+ return decodeBitWidth(i);
+ }
+ }
+
+ return 0;
+ }
+
+ /**
+ * Read n bytes in big endian order and convert to long
+ * @param b - byte array
+ * @return long value
+ */
+ static long bytesToLongBE(InStream input, int n) throws IOException {
+ long out = 0;
+ long val = 0;
+ while (n > 0) {
+ n--;
+ // store it in a long and then shift else integer overflow will occur
+ val = input.read();
+ out |= (val << (n * 8));
+ }
+ return out;
+ }
+
+ /**
+ * Calculate the number of bytes required
+ * @param n - number of values
+ * @param numBits - bit width
+ * @return number of bytes required
+ */
+ static int getTotalBytesRequired(int n, int numBits) {
+ return (n * numBits + 7) / 8;
+ }
+
+ /**
+ * For a given fixed bit this function will return the closest available fixed
+ * bit
+ * @param n
+ * @return closest valid fixed bit
+ */
+ static int getClosestFixedBits(int n) {
+ if (n == 0) {
+ return 1;
+ }
+
+ if (n >= 1 && n <= 24) {
+ return n;
+ } else if (n > 24 && n <= 26) {
+ return 26;
+ } else if (n > 26 && n <= 28) {
+ return 28;
+ } else if (n > 28 && n <= 30) {
+ return 30;
+ } else if (n > 30 && n <= 32) {
+ return 32;
+ } else if (n > 32 && n <= 40) {
+ return 40;
+ } else if (n > 40 && n <= 48) {
+ return 48;
+ } else if (n > 48 && n <= 56) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
+ /**
+ * Finds the closest available fixed bit width match and returns its encoded
+ * value (ordinal)
+ * @param n - fixed bit width to encode
+ * @return encoded fixed bit width
+ */
+ static int encodeBitWidth(int n) {
+ n = getClosestFixedBits(n);
+
+ if (n >= 1 && n <= 24) {
+ return n - 1;
+ } else if (n > 24 && n <= 26) {
+ return FixedBitSizes.TWENTYSIX.ordinal();
+ } else if (n > 26 && n <= 28) {
+ return FixedBitSizes.TWENTYEIGHT.ordinal();
+ } else if (n > 28 && n <= 30) {
+ return FixedBitSizes.THIRTY.ordinal();
+ } else if (n > 30 && n <= 32) {
+ return FixedBitSizes.THIRTYTWO.ordinal();
+ } else if (n > 32 && n <= 40) {
+ return FixedBitSizes.FORTY.ordinal();
+ } else if (n > 40 && n <= 48) {
+ return FixedBitSizes.FORTYEIGHT.ordinal();
+ } else if (n > 48 && n <= 56) {
+ return FixedBitSizes.FIFTYSIX.ordinal();
+ } else {
+ return FixedBitSizes.SIXTYFOUR.ordinal();
+ }
+ }
+
+ /**
+ * Decodes the ordinal fixed bit value to actual fixed bit width value
+ * @param n - encoded fixed bit width
+ * @return decoded fixed bit width
+ */
+ static int decodeBitWidth(int n) {
+ if (n >= FixedBitSizes.ONE.ordinal()
+ && n <= FixedBitSizes.TWENTYFOUR.ordinal()) {
+ return n + 1;
+ } else if (n == FixedBitSizes.TWENTYSIX.ordinal()) {
+ return 26;
+ } else if (n == FixedBitSizes.TWENTYEIGHT.ordinal()) {
+ return 28;
+ } else if (n == FixedBitSizes.THIRTY.ordinal()) {
+ return 30;
+ } else if (n == FixedBitSizes.THIRTYTWO.ordinal()) {
+ return 32;
+ } else if (n == FixedBitSizes.FORTY.ordinal()) {
+ return 40;
+ } else if (n == FixedBitSizes.FORTYEIGHT.ordinal()) {
+ return 48;
+ } else if (n == FixedBitSizes.FIFTYSIX.ordinal()) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
+ /**
+ * Bitpack and write the input values to underlying output stream
+ * @param input - values to write
+ * @param offset - offset
+ * @param len - length
+ * @param bitSize - bit width
+ * @param output - output stream
+ * @throws IOException
+ */
+ static void writeInts(long[] input, int offset, int len, int bitSize,
+ OutputStream output) throws IOException {
+ if (input == null || input.length < 1 || offset < 0 || len < 1
+ || bitSize < 1) {
+ return;
+ }
+
+ int bitsLeft = 8;
+ byte current = 0;
+ for(int i = offset; i < (offset + len); i++) {
+ long value = input[i];
+ int bitsToWrite = bitSize;
+ while (bitsToWrite > bitsLeft) {
+ // add the bits to the bottom of the current word
+ current |= value >>> (bitsToWrite - bitsLeft);
+ // subtract out the bits we just added
+ bitsToWrite -= bitsLeft;
+ // zero out the bits above bitsToWrite
+ value &= (1L << bitsToWrite) - 1;
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ bitsLeft -= bitsToWrite;
+ current |= value << bitsLeft;
+ if (bitsLeft == 0) {
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ }
+
+ // flush
+ if (bitsLeft != 8) {
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ }
+
+ /**
+ * Read bitpacked integers from input stream
+ * @param buffer - input buffer
+ * @param offset - offset
+ * @param len - length
+ * @param bitSize - bit width
+ * @param input - input stream
+ * @throws IOException
+ */
+ static void readInts(long[] buffer, int offset, int len, int bitSize,
+ InStream input) throws IOException {
+ int bitsLeft = 0;
+ int current = 0;
+
+ for(int i = offset; i < (offset + len); i++) {
+ long result = 0;
+ int bitsLeftToRead = bitSize;
+ while (bitsLeftToRead > bitsLeft) {
+ result <<= bitsLeft;
+ result |= current & ((1 << bitsLeft) - 1);
+ bitsLeftToRead -= bitsLeft;
+ current = input.read();
+ bitsLeft = 8;
+ }
+
+ // handle the left over bits
+ if (bitsLeftToRead > 0) {
+ result <<= bitsLeftToRead;
+ bitsLeft -= bitsLeftToRead;
+ result |= (current >> bitsLeft) & ((1 << bitsLeftToRead) - 1);
+ }
+ buffer[i] = result;
+ }
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index e2d7e56..c1052c3 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -381,6 +381,7 @@ public Configuration getConfiguration() {
private final PositionedOutputStream rowIndexStream;
private boolean foundNulls;
private OutStream isPresentOutStream;
+ protected final boolean useDirectV2Encoding;
/**
* Create a tree writer.
@@ -396,6 +397,7 @@ public Configuration getConfiguration() {
this.isCompressed = streamFactory.isCompressed();
this.id = columnId;
this.inspector = inspector;
+ this.useDirectV2Encoding = true;
if (nullable) {
isPresentOutStream = streamFactory.createStream(id,
OrcProto.Stream.Kind.PRESENT);
@@ -430,6 +432,32 @@ protected ColumnStatisticsImpl getFileStatistics() {
return rowIndexEntry;
}
+ IntegerWriter createIntegerWriter(PositionedOutputStream output,
+ boolean signed, boolean isDirectV2) {
+ if (isDirectV2) {
+ return new RunLengthIntegerWriterV2(output, signed);
+ } else {
+ return new RunLengthIntegerWriter(output, signed);
+ }
+ }
+
+ boolean isNewWriteFormat(StreamFactory writer) {
+ String writeFormat = writer.getConfiguration().get(
+ HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname);
+ if (writeFormat == null) {
+ LOG.warn("ORC write format not defined. Using 0.12 ORC write format.");
+ return true;
+ }
+ if (writeFormat
+ .equals(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.defaultVal)) {
+ LOG.info("Using 0.11 ORC write format.");
+ return false;
+ }
+
+ LOG.info("Using 0.12 ORC write format.");
+ return true;
+ }
+
/**
* Add a new value to the column.
* @param obj
@@ -635,10 +663,11 @@ void recordPosition(PositionRecorder recorder) throws IOException {
}
private static class IntegerTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter writer;
+ private final IntegerWriter writer;
private final ShortObjectInspector shortInspector;
private final IntObjectInspector intInspector;
private final LongObjectInspector longInspector;
+ private boolean isDirectV2 = true;
IntegerTreeWriter(int columnId,
ObjectInspector inspector,
@@ -647,7 +676,8 @@ void recordPosition(PositionRecorder recorder) throws IOException {
super(columnId, inspector, writer, nullable);
PositionedOutputStream out = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
- this.writer = new RunLengthIntegerWriter(out, true);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.writer = createIntegerWriter(out, true, isDirectV2);
if (inspector instanceof IntObjectInspector) {
intInspector = (IntObjectInspector) inspector;
shortInspector = null;
@@ -666,6 +696,16 @@ void recordPosition(PositionRecorder recorder) throws IOException {
}
@Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
void write(Object obj) throws IOException {
super.write(obj);
if (obj != null) {
@@ -776,13 +816,13 @@ void recordPosition(PositionRecorder recorder) throws IOException {
private static class StringTreeWriter extends TreeWriter {
private static final int INITIAL_DICTIONARY_SIZE = 4096;
private final OutStream stringOutput;
- private final RunLengthIntegerWriter lengthOutput;
- private final RunLengthIntegerWriter rowOutput;
+ private final IntegerWriter lengthOutput;
+ private final IntegerWriter rowOutput;
private final StringRedBlackTree dictionary =
new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
private final DynamicIntArray rows = new DynamicIntArray();
private final PositionedOutputStream directStreamOutput;
- private final RunLengthIntegerWriter directLengthOutput;
+ private final IntegerWriter directLengthOutput;
private final List savedRowIndex =
new ArrayList();
private final boolean buildIndex;
@@ -791,25 +831,26 @@ void recordPosition(PositionRecorder recorder) throws IOException {
//the total number of non-null rows, turn off dictionary encoding
private final float dictionaryKeySizeThreshold;
private boolean useDictionaryEncoding = true;
+ private boolean isDirectV2 = true;
StringTreeWriter(int columnId,
ObjectInspector inspector,
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
stringOutput = writer.createStream(id,
OrcProto.Stream.Kind.DICTIONARY_DATA);
- lengthOutput = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false);
- rowOutput = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), false);
+ lengthOutput = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
+ rowOutput = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), false, isDirectV2);
recordPosition(rowIndexPosition);
rowIndexValueCount.add(0L);
buildIndex = writer.buildIndex();
directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
- directLengthOutput =
- new RunLengthIntegerWriter(writer.createStream
- (id, OrcProto.Stream.Kind.LENGTH), false);
+ directLengthOutput = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
dictionaryKeySizeThreshold = writer.getConfiguration().getFloat(
HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname,
HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.
@@ -906,24 +947,23 @@ public void visit(StringRedBlackTree.VisitorContext context
rowIndexValueCount.add(0L);
}
- // Calls getPosition on the row output stream if dictionary encoding is used, and the direct
- // output stream if direct encoding is used
- private void recordOutputPosition(OrcProto.RowIndexEntry.Builder base) throws IOException {
- if (useDictionaryEncoding) {
- rowOutput.getPosition(new RowIndexPositionRecorder(base));
- } else {
- directStreamOutput.getPosition(new RowIndexPositionRecorder(base));
- }
- }
-
@Override
OrcProto.ColumnEncoding getEncoding() {
// Returns the encoding used for the last call to writeStripe
if (useDictionaryEncoding) {
+ if(isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DICTIONARY_V2).
+ setDictionarySize(dictionary.size()).build();
+ }
return OrcProto.ColumnEncoding.newBuilder().setKind(
OrcProto.ColumnEncoding.Kind.DICTIONARY).
setDictionarySize(dictionary.size()).build();
} else {
+ if(isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
return OrcProto.ColumnEncoding.newBuilder().setKind(
OrcProto.ColumnEncoding.Kind.DIRECT).build();
}
@@ -956,7 +996,8 @@ long estimateMemory() {
private static class BinaryTreeWriter extends TreeWriter {
private final PositionedOutputStream stream;
- private final RunLengthIntegerWriter length;
+ private final IntegerWriter length;
+ private boolean isDirectV2 = true;
BinaryTreeWriter(int columnId,
ObjectInspector inspector,
@@ -965,12 +1006,23 @@ long estimateMemory() {
super(columnId, inspector, writer, nullable);
this.stream = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
- this.length = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.length = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
recordPosition(rowIndexPosition);
}
@Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
void write(Object obj) throws IOException {
super.write(obj);
if (obj != null) {
@@ -1003,22 +1055,34 @@ void recordPosition(PositionRecorder recorder) throws IOException {
Timestamp.valueOf("2015-01-01 00:00:00").getTime() / MILLIS_PER_SECOND;
private static class TimestampTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter seconds;
- private final RunLengthIntegerWriter nanos;
+ private final IntegerWriter seconds;
+ private final IntegerWriter nanos;
+ private final boolean isDirectV2;
TimestampTreeWriter(int columnId,
ObjectInspector inspector,
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
- this.seconds = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), true);
- this.nanos = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), false);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.seconds = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), true, isDirectV2);
+ this.nanos = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), false, isDirectV2);
recordPosition(rowIndexPosition);
}
@Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
void write(Object obj) throws IOException {
super.write(obj);
if (obj != null) {
@@ -1064,7 +1128,8 @@ void recordPosition(PositionRecorder recorder) throws IOException {
}
private static class DateTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter writer;
+ private final IntegerWriter writer;
+ private final boolean isDirectV2;
DateTreeWriter(int columnId,
ObjectInspector inspector,
@@ -1073,7 +1138,8 @@ void recordPosition(PositionRecorder recorder) throws IOException {
super(columnId, inspector, writer, nullable);
PositionedOutputStream out = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
- this.writer = new RunLengthIntegerWriter(out, true);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.writer = createIntegerWriter(out, true, isDirectV2);
recordPosition(rowIndexPosition);
}
@@ -1101,24 +1167,46 @@ void recordPosition(PositionRecorder recorder) throws IOException {
super.recordPosition(recorder);
writer.getPosition(recorder);
}
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
}
private static class DecimalTreeWriter extends TreeWriter {
private final PositionedOutputStream valueStream;
- private final RunLengthIntegerWriter scaleStream;
+ private final IntegerWriter scaleStream;
+ private final boolean isDirectV2;
DecimalTreeWriter(int columnId,
ObjectInspector inspector,
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
- scaleStream = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), true);
+ this.scaleStream = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), true, isDirectV2);
recordPosition(rowIndexPosition);
}
@Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
void write(Object obj) throws IOException {
super.write(obj);
if (obj != null) {
@@ -1191,25 +1279,36 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
}
private static class ListTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter lengths;
+ private final IntegerWriter lengths;
+ private final boolean isDirectV2;
ListTreeWriter(int columnId,
ObjectInspector inspector,
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
ListObjectInspector listObjectInspector = (ListObjectInspector) inspector;
childrenWriters = new TreeWriter[1];
childrenWriters[0] =
createTreeWriter(listObjectInspector.getListElementObjectInspector(),
writer, true);
- lengths =
- new RunLengthIntegerWriter(writer.createStream(columnId,
- OrcProto.Stream.Kind.LENGTH), false);
+ lengths = createIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
recordPosition(rowIndexPosition);
}
@Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
void write(Object obj) throws IOException {
super.write(obj);
if (obj != null) {
@@ -1241,26 +1340,37 @@ void recordPosition(PositionRecorder recorder) throws IOException {
}
private static class MapTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter lengths;
+ private final IntegerWriter lengths;
+ private final boolean isDirectV2;
MapTreeWriter(int columnId,
ObjectInspector inspector,
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
MapObjectInspector insp = (MapObjectInspector) inspector;
childrenWriters = new TreeWriter[2];
childrenWriters[0] =
createTreeWriter(insp.getMapKeyObjectInspector(), writer, true);
childrenWriters[1] =
createTreeWriter(insp.getMapValueObjectInspector(), writer, true);
- lengths =
- new RunLengthIntegerWriter(writer.createStream(columnId,
- OrcProto.Stream.Kind.LENGTH), false);
+ lengths = createIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
recordPosition(rowIndexPosition);
}
@Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
void write(Object obj) throws IOException {
super.write(obj);
if (obj != null) {
@@ -1346,8 +1456,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
private static TreeWriter createTreeWriter(ObjectInspector inspector,
StreamFactory streamFactory,
- boolean nullable
- ) throws IOException {
+ boolean nullable) throws IOException {
switch (inspector.getCategory()) {
case PRIMITIVE:
switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) {
diff --git ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
index 417c37a..d5bea25 100644
--- ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
+++ ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
@@ -73,6 +73,8 @@ message ColumnEncoding {
enum Kind {
DIRECT = 0;
DICTIONARY = 1;
+ DIRECT_V2 = 2;
+ DICTIONARY_V2 = 3;
}
required Kind kind = 1;
optional uint32 dictionarySize = 2;
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
new file mode 100644
index 0000000..c0f765d
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Random;
+
+import org.junit.Test;
+
+import com.google.common.primitives.Longs;
+
+public class TestBitPack {
+
+ private static final int SIZE = 100;
+ private static Random rand = new Random(100);
+
+ private long[] deltaEncode(long[] inp) {
+ long[] output = new long[inp.length];
+ for(int i = 0; i < inp.length; i++) {
+ output[i] = SerializationUtils.zigzagEncode(inp[i]);
+ }
+ return output;
+ }
+
+ private long nextLong(Random rng, long n) {
+ long bits, val;
+ do {
+ bits = (rng.nextLong() << 1) >>> 1;
+ val = bits % n;
+ } while (bits - val + (n - 1) < 0L);
+ return val;
+ }
+
+ private void runTest(int numBits) throws IOException {
+ long[] inp = new long[SIZE];
+ for(int i = 0; i < SIZE; i++) {
+ long val = 0;
+ if (numBits <= 32) {
+ if (numBits == 1) {
+ val = -1 * rand.nextInt(2);
+ } else {
+ val = rand.nextInt((int) Math.pow(2, numBits - 1));
+ }
+ } else {
+ val = nextLong(rand, (long) Math.pow(2, numBits - 2));
+ }
+ if (val % 2 == 0) {
+ val = -val;
+ }
+ inp[i] = val;
+ }
+ long[] deltaEncoded = deltaEncode(inp);
+ long minInput = Collections.min(Longs.asList(deltaEncoded));
+ long maxInput = Collections.max(Longs.asList(deltaEncoded));
+ long rangeInput = maxInput - minInput;
+ int fixedWidth = SerializationUtils.findClosestNumBits(rangeInput);
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ OutStream output = new OutStream("test", SIZE, null, collect);
+ SerializationUtils.writeInts(deltaEncoded, 0, deltaEncoded.length,
+ fixedWidth, output);
+ output.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ long[] buff = new long[SIZE];
+ SerializationUtils.readInts(buff, 0, SIZE, fixedWidth,
+ InStream.create("test", inBuf, null, SIZE));
+ for(int i = 0; i < SIZE; i++) {
+ buff[i] = SerializationUtils.zigzagDecode(buff[i]);
+ }
+ assertEquals(numBits, fixedWidth);
+ assertArrayEquals(inp, buff);
+ }
+
+ @Test
+ public void test01BitPacking1Bit() throws IOException {
+ runTest(1);
+ }
+
+ @Test
+ public void test02BitPacking2Bit() throws IOException {
+ runTest(2);
+ }
+
+ @Test
+ public void test03BitPacking3Bit() throws IOException {
+ runTest(3);
+ }
+
+ @Test
+ public void test04BitPacking4Bit() throws IOException {
+ runTest(4);
+ }
+
+ @Test
+ public void test05BitPacking5Bit() throws IOException {
+ runTest(5);
+ }
+
+ @Test
+ public void test06BitPacking6Bit() throws IOException {
+ runTest(6);
+ }
+
+ @Test
+ public void test07BitPacking7Bit() throws IOException {
+ runTest(7);
+ }
+
+ @Test
+ public void test08BitPacking8Bit() throws IOException {
+ runTest(8);
+ }
+
+ @Test
+ public void test09BitPacking9Bit() throws IOException {
+ runTest(9);
+ }
+
+ @Test
+ public void test10BitPacking10Bit() throws IOException {
+ runTest(10);
+ }
+
+ @Test
+ public void test11BitPacking11Bit() throws IOException {
+ runTest(11);
+ }
+
+ @Test
+ public void test12BitPacking12Bit() throws IOException {
+ runTest(12);
+ }
+
+ @Test
+ public void test13BitPacking13Bit() throws IOException {
+ runTest(13);
+ }
+
+ @Test
+ public void test14BitPacking14Bit() throws IOException {
+ runTest(14);
+ }
+
+ @Test
+ public void test15BitPacking15Bit() throws IOException {
+ runTest(15);
+ }
+
+ @Test
+ public void test16BitPacking16Bit() throws IOException {
+ runTest(16);
+ }
+
+ @Test
+ public void test17BitPacking17Bit() throws IOException {
+ runTest(17);
+ }
+
+ @Test
+ public void test18BitPacking18Bit() throws IOException {
+ runTest(18);
+ }
+
+ @Test
+ public void test19BitPacking19Bit() throws IOException {
+ runTest(19);
+ }
+
+ @Test
+ public void test20BitPacking20Bit() throws IOException {
+ runTest(20);
+ }
+
+ @Test
+ public void test21BitPacking21Bit() throws IOException {
+ runTest(21);
+ }
+
+ @Test
+ public void test22BitPacking22Bit() throws IOException {
+ runTest(22);
+ }
+
+ @Test
+ public void test23BitPacking23Bit() throws IOException {
+ runTest(23);
+ }
+
+ @Test
+ public void test24BitPacking24Bit() throws IOException {
+ runTest(24);
+ }
+
+ @Test
+ public void test26BitPacking26Bit() throws IOException {
+ runTest(26);
+ }
+
+ @Test
+ public void test28BitPacking28Bit() throws IOException {
+ runTest(28);
+ }
+
+ @Test
+ public void test30BitPacking30Bit() throws IOException {
+ runTest(30);
+ }
+
+ @Test
+ public void test32BitPacking32Bit() throws IOException {
+ runTest(32);
+ }
+
+ @Test
+ public void test40BitPacking40Bit() throws IOException {
+ runTest(40);
+ }
+
+ @Test
+ public void test48BitPacking48Bit() throws IOException {
+ runTest(48);
+ }
+
+ @Test
+ public void test56BitPacking56Bit() throws IOException {
+ runTest(56);
+ }
+
+ @Test
+ public void test64BitPacking64Bit() throws IOException {
+ runTest(64);
+ }
+}
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java
new file mode 100644
index 0000000..0fb0fcb
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Test;
+
+public class TestIntegerCompressionReader {
+
+ public void runSeekTest(CompressionCodec codec) throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2(
+ new OutStream("test", 1000, codec, collect), true);
+ TestInStream.PositionCollector[] positions =
+ new TestInStream.PositionCollector[4096];
+ Random random = new Random(99);
+ int[] junk = new int[2048];
+ for(int i=0; i < junk.length; ++i) {
+ junk[i] = random.nextInt();
+ }
+ for(int i=0; i < 4096; ++i) {
+ positions[i] = new TestInStream.PositionCollector();
+ out.getPosition(positions[i]);
+ // test runs, incrementing runs, non-runs
+ if (i < 1024) {
+ out.write(i/4);
+ } else if (i < 2048) {
+ out.write(2*i);
+ } else {
+ out.write(junk[i-2048]);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create
+ ("test", inBuf, codec, 1000), true);
+ for(int i=0; i < 2048; ++i) {
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i/4, x);
+ } else if (i < 2048) {
+ assertEquals(2*i, x);
+ } else {
+ assertEquals(junk[i-2048], x);
+ }
+ }
+ for(int i=2047; i >= 0; --i) {
+ in.seek(positions[i]);
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i/4, x);
+ } else if (i < 2048) {
+ assertEquals(2*i, x);
+ } else {
+ assertEquals(junk[i-2048], x);
+ }
+ }
+ }
+
+ @Test
+ public void testUncompressedSeek() throws Exception {
+ runSeekTest(null);
+ }
+
+ @Test
+ public void testCompressedSeek() throws Exception {
+ runSeekTest(new ZlibCodec());
+ }
+
+ @Test
+ public void testSkips() throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2(
+ new OutStream("test", 100, null, collect), true);
+ for(int i=0; i < 2048; ++i) {
+ if (i < 1024) {
+ out.write(i);
+ } else {
+ out.write(256 * i);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create
+ ("test", inBuf, null, 100), true);
+ for(int i=0; i < 2048; i += 10) {
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i, x);
+ } else {
+ assertEquals(256 * i, x);
+ }
+ if (i < 2038) {
+ in.skip(9);
+ }
+ in.skip(0);
+ }
+ }
+}
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
new file mode 100644
index 0000000..4d3183e
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
@@ -0,0 +1,824 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+
+public class TestNewIntegerEncoding {
+
+ public static class Row {
+ Integer int1;
+ Long long1;
+
+ public Row(int val, long l) {
+ this.int1 = val;
+ this.long1 = l;
+ }
+ }
+
+ public List fetchData(String path) throws IOException {
+ List input = new ArrayList();
+ FileInputStream stream = new FileInputStream(new File(path));
+ try {
+ FileChannel fc = stream.getChannel();
+ MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size());
+ /* Instead of using default, pass in a decoder. */
+ String[] lines = Charset.defaultCharset().decode(bb).toString()
+ .split("\n");
+ for(String line : lines) {
+ long val = 0;
+ try {
+ val = Long.parseLong(line);
+ } catch (NumberFormatException e) {
+ // for now lets ignore (assign 0)
+ }
+ input.add(val);
+ }
+ } finally {
+ stream.close();
+ }
+ return input;
+ }
+
+ Path workDir = new Path(System.getProperty("test.tmp.dir", "target"
+ + File.separator + "test" + File.separator + "tmp"));
+
+ Configuration conf;
+ FileSystem fs;
+ Path testFilePath;
+ String resDir = "ql/src/test/resources";
+
+ @Rule
+ public TestName testCaseName = new TestName();
+
+ @Before
+ public void openFileSystem() throws Exception {
+ conf = new Configuration();
+ fs = FileSystem.getLocal(conf);
+ testFilePath = new Path(workDir, "TestOrcFile."
+ + testCaseName.getMethodName() + ".orc");
+ fs.delete(testFilePath, false);
+ }
+
+ @Test
+ public void testBasicRow() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Row.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ writer.addRow(new Row(111, 1111L));
+ writer.addRow(new Row(111, 1111L));
+ writer.addRow(new Row(111, 1111L));
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(new IntWritable(111), ((OrcStruct) row).getFieldValue(0));
+ assertEquals(new LongWritable(1111), ((OrcStruct) row).getFieldValue(1));
+ }
+ }
+
+ @Test
+ public void testBasicOld() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5, 6,
+ 7, 8, 9, 10, 1, 1, 1, 1, 1, 1, 10, 9, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1,
+ 2, 5, 1, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1,
+ 9, 2, 6, 3, 7, 1, 9, 2, 6, 2000, 2, 1, 1, 1, 1, 1, 3, 7, 1, 9, 2, 6, 1,
+ 1, 1, 1, 1 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+ conf.set("hive.exec.orc.write.format", "0.11");
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testBasicNew() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5, 6,
+ 7, 8, 9, 10, 1, 1, 1, 1, 1, 1, 10, 9, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1,
+ 2, 5, 1, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1,
+ 9, 2, 6, 3, 7, 1, 9, 2, 6, 2000, 2, 1, 1, 1, 1, 1, 3, 7, 1, 9, 2, 6, 1,
+ 1, 1, 1, 1 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testBasicDelta1() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { -500, -400, -350, -325, -310 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testBasicDelta2() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { -500, -600, -650, -675, -710 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testBasicDelta3() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 500, 400, 350, 325, 310 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testBasicDelta4() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 500, 600, 650, 675, 710 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testIntegerMin() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add((long) Integer.MIN_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.ZLIB, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testIntegerMax() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add((long) Integer.MAX_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testLongMin() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add(Long.MIN_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testLongMax() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add(Long.MAX_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testRandomInt() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for(int i = 0; i < 100000; i++) {
+ input.add((long) rand.nextInt());
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testRandomLong() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for(int i = 0; i < 100000; i++) {
+ input.add(rand.nextLong());
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseNegativeMin() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2,
+ 3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, 1, 2, 3, 1, 8, 30, 1, 3, 414, 1,
+ 1, 135, 3, 3, 1, 414, 2, 1, 2, 2, 594, 2, 5, 6, 4, 11, 1, 2, 2, 1, 1,
+ 52, 4, 1, 2, 7, 1, 17, 334, 1, 2, 1, 2, 2, 6, 1, 266, 1, 2, 217, 2, 6,
+ 2, 13, 2, 2, 1, 2, 3, 5, 1, 2, 1, 7244, 11813, 1, 33, 2, -13, 1, 2, 3,
+ 13, 1, 92, 3, 13, 5, 14, 9, 141, 12, 6, 15, 25, 1, 1, 1, 46, 2, 1, 1,
+ 141, 3, 1, 1, 1, 1, 2, 1, 4, 34, 5, 78, 8, 1, 2, 2, 1, 9, 10, 2, 1, 4,
+ 13, 1, 5, 4, 4, 19, 5, 1, 1, 1, 68, 33, 399, 1, 1885, 25, 5, 2, 4, 1,
+ 1, 2, 16, 1, 2966, 3, 1, 1, 25501, 1, 1, 1, 66, 1, 3, 8, 131, 14, 5, 1,
+ 2, 2, 1, 1, 8, 1, 1, 2, 1, 5, 9, 2, 3, 112, 13, 2, 2, 1, 5, 10, 3, 1,
+ 1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2,
+ 2, 16 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseNegativeMin2() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2,
+ 3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, 1, 2, 3, 1, 8, 30, 1, 3, 414, 1,
+ 1, 135, 3, 3, 1, 414, 2, 1, 2, 2, 594, 2, 5, 6, 4, 11, 1, 2, 2, 1, 1,
+ 52, 4, 1, 2, 7, 1, 17, 334, 1, 2, 1, 2, 2, 6, 1, 266, 1, 2, 217, 2, 6,
+ 2, 13, 2, 2, 1, 2, 3, 5, 1, 2, 1, 7244, 11813, 1, 33, 2, -1, 1, 2, 3,
+ 13, 1, 92, 3, 13, 5, 14, 9, 141, 12, 6, 15, 25, 1, 1, 1, 46, 2, 1, 1,
+ 141, 3, 1, 1, 1, 1, 2, 1, 4, 34, 5, 78, 8, 1, 2, 2, 1, 9, 10, 2, 1, 4,
+ 13, 1, 5, 4, 4, 19, 5, 1, 1, 1, 68, 33, 399, 1, 1885, 25, 5, 2, 4, 1,
+ 1, 2, 16, 1, 2966, 3, 1, 1, 25501, 1, 1, 1, 66, 1, 3, 8, 131, 14, 5, 1,
+ 2, 2, 1, 1, 8, 1, 1, 2, 1, 5, 9, 2, 3, 112, 13, 2, 2, 1, 5, 10, 3, 1,
+ 1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2,
+ 2, 16 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseNegativeMin3() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2,
+ 3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, 1, 2, 3, 1, 8, 30, 1, 3, 414, 1,
+ 1, 135, 3, 3, 1, 414, 2, 1, 2, 2, 594, 2, 5, 6, 4, 11, 1, 2, 2, 1, 1,
+ 52, 4, 1, 2, 7, 1, 17, 334, 1, 2, 1, 2, 2, 6, 1, 266, 1, 2, 217, 2, 6,
+ 2, 13, 2, 2, 1, 2, 3, 5, 1, 2, 1, 7244, 11813, 1, 33, 2, 0, 1, 2, 3,
+ 13, 1, 92, 3, 13, 5, 14, 9, 141, 12, 6, 15, 25, 1, 1, 1, 46, 2, 1, 1,
+ 141, 3, 1, 1, 1, 1, 2, 1, 4, 34, 5, 78, 8, 1, 2, 2, 1, 9, 10, 2, 1, 4,
+ 13, 1, 5, 4, 4, 19, 5, 1, 1, 1, 68, 33, 399, 1, 1885, 25, 5, 2, 4, 1,
+ 1, 2, 16, 1, 2966, 3, 1, 1, 25501, 1, 1, 1, 66, 1, 3, 8, 131, 14, 5, 1,
+ 2, 2, 1, 1, 8, 1, 1, 2, 1, 5, 9, 2, 3, 112, 13, 2, 2, 1, 5, 10, 3, 1,
+ 1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2,
+ 2, 16 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseNegativeMin4() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 13, 13, 11, 8, 13, 10, 10, 11, 11, 14, 11, 7, 13,
+ 12, 12, 11, 15, 12, 12, 9, 8, 10, 13, 11, 8, 6, 5, 6, 11, 7, 15, 10, 7,
+ 6, 8, 7, 9, 9, 11, 33, 11, 3, 7, 4, 6, 10, 14, 12, 5, 14, 7, 6 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt0() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for(int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(0, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt1() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for(int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(1, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt255() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for(int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(255, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.ZLIB, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt256() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for(int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(256, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.ZLIB, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBase510() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for(int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(510, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.ZLIB, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBase511() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for(int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(511, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.ZLIB, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testSeek() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for(int i = 0; i < 100000; i++) {
+ input.add((long) rand.nextInt());
+ }
+ conf.set("hive.exec.orc.write.format", "0.11");
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.NONE, 10000, 10000);
+ for(Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 55555;
+ rows.seekToRow(idx);
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+}
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java
index 9f989fd..2f7a7f1 100644
--- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.hive.ql.io.orc;
import static junit.framework.Assert.assertEquals;
diff --git ql/src/test/resources/orc-file-dump-dictionary-threshold.out ql/src/test/resources/orc-file-dump-dictionary-threshold.out
index c486f4d..003c132 100644
--- ql/src/test/resources/orc-file-dump-dictionary-threshold.out
+++ ql/src/test/resources/orc-file-dump-dictionary-threshold.out
@@ -11,68 +11,68 @@ Statistics:
Column 3: count: 21000 min: Darkness,-230 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878-18010-18410-18524-18788-19204-19254-19518-19596-19786-19874-19904-20390-20752-20936
Stripes:
- Stripe: offset: 3 data: 107035 rows: 4000 tail: 65 index: 217
+ Stripe: offset: 3 data: 102311 rows: 4000 tail: 68 index: 217
Stream: column 0 section ROW_INDEX start: 3 length 10
Stream: column 1 section ROW_INDEX start: 13 length 36
Stream: column 2 section ROW_INDEX start: 49 length 39
Stream: column 3 section ROW_INDEX start: 88 length 132
- Stream: column 1 section DATA start: 220 length 18043
- Stream: column 2 section DATA start: 18263 length 34740
- Stream: column 3 section DATA start: 53003 length 50887
- Stream: column 3 section LENGTH start: 103890 length 3365
+ Stream: column 1 section DATA start: 220 length 16022
+ Stream: column 2 section DATA start: 16242 length 32028
+ Stream: column 3 section DATA start: 48270 length 50887
+ Stream: column 3 section LENGTH start: 99157 length 3374
Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
- Encoding column 3: DIRECT
- Stripe: offset: 107320 data: 289727 rows: 5000 tail: 65 index: 349
- Stream: column 0 section ROW_INDEX start: 107320 length 10
- Stream: column 1 section ROW_INDEX start: 107330 length 36
- Stream: column 2 section ROW_INDEX start: 107366 length 39
- Stream: column 3 section ROW_INDEX start: 107405 length 264
- Stream: column 1 section DATA start: 107669 length 22581
- Stream: column 2 section DATA start: 130250 length 43426
- Stream: column 3 section DATA start: 173676 length 219588
- Stream: column 3 section LENGTH start: 393264 length 4132
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
+ Encoding column 3: DIRECT_V2
+ Stripe: offset: 102599 data: 284999 rows: 5000 tail: 68 index: 349
+ Stream: column 0 section ROW_INDEX start: 102599 length 10
+ Stream: column 1 section ROW_INDEX start: 102609 length 36
+ Stream: column 2 section ROW_INDEX start: 102645 length 39
+ Stream: column 3 section ROW_INDEX start: 102684 length 264
+ Stream: column 1 section DATA start: 102948 length 20029
+ Stream: column 2 section DATA start: 122977 length 40035
+ Stream: column 3 section DATA start: 163012 length 219588
+ Stream: column 3 section LENGTH start: 382600 length 5347
Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
- Encoding column 3: DIRECT
- Stripe: offset: 397461 data: 496162 rows: 5000 tail: 66 index: 536
- Stream: column 0 section ROW_INDEX start: 397461 length 10
- Stream: column 1 section ROW_INDEX start: 397471 length 36
- Stream: column 2 section ROW_INDEX start: 397507 length 39
- Stream: column 3 section ROW_INDEX start: 397546 length 451
- Stream: column 1 section DATA start: 397997 length 22605
- Stream: column 2 section DATA start: 420602 length 43444
- Stream: column 3 section DATA start: 464046 length 425862
- Stream: column 3 section LENGTH start: 889908 length 4251
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
+ Encoding column 3: DIRECT_V2
+ Stripe: offset: 388015 data: 491655 rows: 5000 tail: 69 index: 536
+ Stream: column 0 section ROW_INDEX start: 388015 length 10
+ Stream: column 1 section ROW_INDEX start: 388025 length 36
+ Stream: column 2 section ROW_INDEX start: 388061 length 39
+ Stream: column 3 section ROW_INDEX start: 388100 length 451
+ Stream: column 1 section DATA start: 388551 length 20029
+ Stream: column 2 section DATA start: 408580 length 40035
+ Stream: column 3 section DATA start: 448615 length 425862
+ Stream: column 3 section LENGTH start: 874477 length 5729
Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
- Encoding column 3: DIRECT
- Stripe: offset: 894225 data: 711982 rows: 5000 tail: 65 index: 677
- Stream: column 0 section ROW_INDEX start: 894225 length 10
- Stream: column 1 section ROW_INDEX start: 894235 length 36
- Stream: column 2 section ROW_INDEX start: 894271 length 39
- Stream: column 3 section ROW_INDEX start: 894310 length 592
- Stream: column 1 section DATA start: 894902 length 22591
- Stream: column 2 section DATA start: 917493 length 43414
- Stream: column 3 section DATA start: 960907 length 641580
- Stream: column 3 section LENGTH start: 1602487 length 4397
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
+ Encoding column 3: DIRECT_V2
+ Stripe: offset: 880275 data: 707368 rows: 5000 tail: 68 index: 677
+ Stream: column 0 section ROW_INDEX start: 880275 length 10
+ Stream: column 1 section ROW_INDEX start: 880285 length 36
+ Stream: column 2 section ROW_INDEX start: 880321 length 39
+ Stream: column 3 section ROW_INDEX start: 880360 length 592
+ Stream: column 1 section DATA start: 880952 length 20029
+ Stream: column 2 section DATA start: 900981 length 40035
+ Stream: column 3 section DATA start: 941016 length 641580
+ Stream: column 3 section LENGTH start: 1582596 length 5724
Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
- Encoding column 3: DIRECT
- Stripe: offset: 1606949 data: 350645 rows: 2000 tail: 66 index: 786
- Stream: column 0 section ROW_INDEX start: 1606949 length 10
- Stream: column 1 section ROW_INDEX start: 1606959 length 36
- Stream: column 2 section ROW_INDEX start: 1606995 length 39
- Stream: column 3 section ROW_INDEX start: 1607034 length 701
- Stream: column 1 section DATA start: 1607735 length 9027
- Stream: column 2 section DATA start: 1616762 length 17375
- Stream: column 3 section DATA start: 1634137 length 322259
- Stream: column 3 section LENGTH start: 1956396 length 1984
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
+ Encoding column 3: DIRECT_V2
+ Stripe: offset: 1588388 data: 348697 rows: 2000 tail: 67 index: 786
+ Stream: column 0 section ROW_INDEX start: 1588388 length 10
+ Stream: column 1 section ROW_INDEX start: 1588398 length 36
+ Stream: column 2 section ROW_INDEX start: 1588434 length 39
+ Stream: column 3 section ROW_INDEX start: 1588473 length 701
+ Stream: column 1 section DATA start: 1589174 length 8011
+ Stream: column 2 section DATA start: 1597185 length 16014
+ Stream: column 3 section DATA start: 1613199 length 322259
+ Stream: column 3 section LENGTH start: 1935458 length 2413
Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
- Encoding column 3: DIRECT
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
+ Encoding column 3: DIRECT_V2
\ No newline at end of file
diff --git ql/src/test/resources/orc-file-dump.out ql/src/test/resources/orc-file-dump.out
index 8b88931..b250797 100644
--- ql/src/test/resources/orc-file-dump.out
+++ ql/src/test/resources/orc-file-dump.out
@@ -11,73 +11,73 @@ Statistics:
Column 3: count: 21000 min: Darkness, max: worst
Stripes:
- Stripe: offset: 3 data: 69605 rows: 5000 tail: 72 index: 119
+ Stripe: offset: 3 data: 63766 rows: 5000 tail: 74 index: 119
Stream: column 0 section ROW_INDEX start: 3 length 10
Stream: column 1 section ROW_INDEX start: 13 length 35
Stream: column 2 section ROW_INDEX start: 48 length 39
Stream: column 3 section ROW_INDEX start: 87 length 35
- Stream: column 1 section DATA start: 122 length 22605
- Stream: column 2 section DATA start: 22727 length 43426
- Stream: column 3 section DATA start: 66153 length 3403
- Stream: column 3 section LENGTH start: 69556 length 38
- Stream: column 3 section DICTIONARY_DATA start: 69594 length 133
+ Stream: column 1 section DATA start: 122 length 20029
+ Stream: column 2 section DATA start: 20151 length 40035
+ Stream: column 3 section DATA start: 60186 length 3544
+ Stream: column 3 section LENGTH start: 63730 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 63755 length 133
Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
- Encoding column 3: DICTIONARY[35]
- Stripe: offset: 69799 data: 69584 rows: 5000 tail: 73 index: 118
- Stream: column 0 section ROW_INDEX start: 69799 length 10
- Stream: column 1 section ROW_INDEX start: 69809 length 34
- Stream: column 2 section ROW_INDEX start: 69843 length 39
- Stream: column 3 section ROW_INDEX start: 69882 length 35
- Stream: column 1 section DATA start: 69917 length 22597
- Stream: column 2 section DATA start: 92514 length 43439
- Stream: column 3 section DATA start: 135953 length 3377
- Stream: column 3 section LENGTH start: 139330 length 38
- Stream: column 3 section DICTIONARY_DATA start: 139368 length 133
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
+ Encoding column 3: DICTIONARY_V2
+ Stripe: offset: 63962 data: 63755 rows: 5000 tail: 76 index: 118
+ Stream: column 0 section ROW_INDEX start: 63962 length 10
+ Stream: column 1 section ROW_INDEX start: 63972 length 34
+ Stream: column 2 section ROW_INDEX start: 64006 length 39
+ Stream: column 3 section ROW_INDEX start: 64045 length 35
+ Stream: column 1 section DATA start: 64080 length 20029
+ Stream: column 2 section DATA start: 84109 length 40035
+ Stream: column 3 section DATA start: 124144 length 3533
+ Stream: column 3 section LENGTH start: 127677 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 127702 length 133
Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
- Encoding column 3: DICTIONARY[35]
- Stripe: offset: 139574 data: 69570 rows: 5000 tail: 73 index: 120
- Stream: column 0 section ROW_INDEX start: 139574 length 10
- Stream: column 1 section ROW_INDEX start: 139584 length 36
- Stream: column 2 section ROW_INDEX start: 139620 length 39
- Stream: column 3 section ROW_INDEX start: 139659 length 35
- Stream: column 1 section DATA start: 139694 length 22594
- Stream: column 2 section DATA start: 162288 length 43415
- Stream: column 3 section DATA start: 205703 length 3390
- Stream: column 3 section LENGTH start: 209093 length 38
- Stream: column 3 section DICTIONARY_DATA start: 209131 length 133
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
+ Encoding column 3: DICTIONARY_V2
+ Stripe: offset: 127911 data: 63766 rows: 5000 tail: 76 index: 120
+ Stream: column 0 section ROW_INDEX start: 127911 length 10
+ Stream: column 1 section ROW_INDEX start: 127921 length 36
+ Stream: column 2 section ROW_INDEX start: 127957 length 39
+ Stream: column 3 section ROW_INDEX start: 127996 length 35
+ Stream: column 1 section DATA start: 128031 length 20029
+ Stream: column 2 section DATA start: 148060 length 40035
+ Stream: column 3 section DATA start: 188095 length 3544
+ Stream: column 3 section LENGTH start: 191639 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 191664 length 133
Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
- Encoding column 3: DICTIONARY[35]
- Stripe: offset: 209337 data: 69551 rows: 5000 tail: 72 index: 119
- Stream: column 0 section ROW_INDEX start: 209337 length 10
- Stream: column 1 section ROW_INDEX start: 209347 length 35
- Stream: column 2 section ROW_INDEX start: 209382 length 39
- Stream: column 3 section ROW_INDEX start: 209421 length 35
- Stream: column 1 section DATA start: 209456 length 22575
- Stream: column 2 section DATA start: 232031 length 43426
- Stream: column 3 section DATA start: 275457 length 3379
- Stream: column 3 section LENGTH start: 278836 length 38
- Stream: column 3 section DICTIONARY_DATA start: 278874 length 133
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
+ Encoding column 3: DICTIONARY_V2
+ Stripe: offset: 191873 data: 63796 rows: 5000 tail: 74 index: 119
+ Stream: column 0 section ROW_INDEX start: 191873 length 10
+ Stream: column 1 section ROW_INDEX start: 191883 length 35
+ Stream: column 2 section ROW_INDEX start: 191918 length 39
+ Stream: column 3 section ROW_INDEX start: 191957 length 35
+ Stream: column 1 section DATA start: 191992 length 20029
+ Stream: column 2 section DATA start: 212021 length 40035
+ Stream: column 3 section DATA start: 252056 length 3574
+ Stream: column 3 section LENGTH start: 255630 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 255655 length 133
Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
- Encoding column 3: DICTIONARY[35]
- Stripe: offset: 279079 data: 14096 rows: 1000 tail: 68 index: 120
- Stream: column 0 section ROW_INDEX start: 279079 length 10
- Stream: column 1 section ROW_INDEX start: 279089 length 36
- Stream: column 2 section ROW_INDEX start: 279125 length 39
- Stream: column 3 section ROW_INDEX start: 279164 length 35
- Stream: column 1 section DATA start: 279199 length 4529
- Stream: column 2 section DATA start: 283728 length 8690
- Stream: column 3 section DATA start: 292418 length 706
- Stream: column 3 section LENGTH start: 293124 length 38
- Stream: column 3 section DICTIONARY_DATA start: 293162 length 133
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
+ Encoding column 3: DICTIONARY_V2
+ Stripe: offset: 255862 data: 12940 rows: 1000 tail: 71 index: 120
+ Stream: column 0 section ROW_INDEX start: 255862 length 10
+ Stream: column 1 section ROW_INDEX start: 255872 length 36
+ Stream: column 2 section ROW_INDEX start: 255908 length 39
+ Stream: column 3 section ROW_INDEX start: 255947 length 35
+ Stream: column 1 section DATA start: 255982 length 4007
+ Stream: column 2 section DATA start: 259989 length 8007
+ Stream: column 3 section DATA start: 267996 length 768
+ Stream: column 3 section LENGTH start: 268764 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 268789 length 133
Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
- Encoding column 3: DICTIONARY[35]
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
+ Encoding column 3: DICTIONARY_V2
\ No newline at end of file