diff --git ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
index f6acc00..cce1415 100644
--- ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
+++ ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
@@ -5157,10 +5157,12 @@ public ColumnEncoding getDefaultInstanceForType() {
implements com.google.protobuf.ProtocolMessageEnum {
DIRECT(0, 0),
DICTIONARY(1, 1),
+ DIRECT_V2(2, 2),
;
public static final int DIRECT_VALUE = 0;
public static final int DICTIONARY_VALUE = 1;
+ public static final int DIRECT_V2_VALUE = 2;
public final int getNumber() { return value; }
@@ -5169,6 +5171,7 @@ public static Kind valueOf(int value) {
switch (value) {
case 0: return DIRECT;
case 1: return DICTIONARY;
+ case 2: return DIRECT_V2;
default: return null;
}
}
@@ -5199,7 +5202,7 @@ public Kind findValueByNumber(int number) {
}
private static final Kind[] VALUES = {
- DIRECT, DICTIONARY,
+ DIRECT, DICTIONARY, DIRECT_V2,
};
public static Kind valueOf(
@@ -10568,41 +10571,42 @@ void setMagic(com.google.protobuf.ByteString value) {
"nd\022\016\n\006column\030\002 \001(\r\022\016\n\006length\030\003 \001(\004\"r\n\004Ki" +
"nd\022\013\n\007PRESENT\020\000\022\010\n\004DATA\020\001\022\n\n\006LENGTH\020\002\022\023\n" +
"\017DICTIONARY_DATA\020\003\022\024\n\020DICTIONARY_COUNT\020\004" +
- "\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\221\001\n\016Colum",
+ "\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\240\001\n\016Colum",
"nEncoding\022C\n\004kind\030\001 \002(\01625.org.apache.had" +
"oop.hive.ql.io.orc.ColumnEncoding.Kind\022\026" +
- "\n\016dictionarySize\030\002 \001(\r\"\"\n\004Kind\022\n\n\006DIRECT" +
- "\020\000\022\016\n\nDICTIONARY\020\001\"\214\001\n\014StripeFooter\0229\n\007s" +
- "treams\030\001 \003(\0132(.org.apache.hadoop.hive.ql" +
- ".io.orc.Stream\022A\n\007columns\030\002 \003(\01320.org.ap" +
- "ache.hadoop.hive.ql.io.orc.ColumnEncodin" +
- "g\"\236\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apache.ha" +
- "doop.hive.ql.io.orc.Type.Kind\022\024\n\010subtype" +
- "s\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\260\001\n\004Kind",
- "\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002\022\007\n\003IN" +
- "T\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE\020\006\022\n\n\006" +
- "STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020\t\022\010\n\004L" +
- "IST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNION\020\r\022\013\n" +
- "\007DECIMAL\020\016\"x\n\021StripeInformation\022\016\n\006offse" +
- "t\030\001 \001(\004\022\023\n\013indexLength\030\002 \001(\004\022\022\n\ndataLeng" +
- "th\030\003 \001(\004\022\024\n\014footerLength\030\004 \001(\004\022\024\n\014number" +
- "OfRows\030\005 \001(\004\"/\n\020UserMetadataItem\022\014\n\004name" +
- "\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n\006Footer\022\024\n\014head" +
- "erLength\030\001 \001(\004\022\025\n\rcontentLength\030\002 \001(\004\022D\n",
- "\007stripes\030\003 \003(\01323.org.apache.hadoop.hive." +
- "ql.io.orc.StripeInformation\0225\n\005types\030\004 \003" +
- "(\0132&.org.apache.hadoop.hive.ql.io.orc.Ty" +
- "pe\022D\n\010metadata\030\005 \003(\01322.org.apache.hadoop" +
- ".hive.ql.io.orc.UserMetadataItem\022\024\n\014numb" +
- "erOfRows\030\006 \001(\004\022F\n\nstatistics\030\007 \003(\01322.org" +
- ".apache.hadoop.hive.ql.io.orc.ColumnStat" +
- "istics\022\026\n\016rowIndexStride\030\010 \001(\r\"\255\001\n\nPostS" +
- "cript\022\024\n\014footerLength\030\001 \001(\004\022F\n\013compressi" +
- "on\030\002 \001(\01621.org.apache.hadoop.hive.ql.io.",
- "orc.CompressionKind\022\034\n\024compressionBlockS" +
- "ize\030\003 \001(\004\022\023\n\007version\030\004 \003(\rB\002\020\001\022\016\n\005magic\030" +
- "\300> \001(\t*:\n\017CompressionKind\022\010\n\004NONE\020\000\022\010\n\004Z" +
- "LIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003"
+ "\n\016dictionarySize\030\002 \001(\r\"1\n\004Kind\022\n\n\006DIRECT" +
+ "\020\000\022\016\n\nDICTIONARY\020\001\022\r\n\tDIRECT_V2\020\002\"\214\001\n\014St" +
+ "ripeFooter\0229\n\007streams\030\001 \003(\0132(.org.apache" +
+ ".hadoop.hive.ql.io.orc.Stream\022A\n\007columns" +
+ "\030\002 \003(\01320.org.apache.hadoop.hive.ql.io.or" +
+ "c.ColumnEncoding\"\236\002\n\004Type\0229\n\004kind\030\001 \002(\0162" +
+ "+.org.apache.hadoop.hive.ql.io.orc.Type." +
+ "Kind\022\024\n\010subtypes\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames",
+ "\030\003 \003(\t\"\260\001\n\004Kind\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t" +
+ "\n\005SHORT\020\002\022\007\n\003INT\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022" +
+ "\n\n\006DOUBLE\020\006\022\n\n\006STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tT" +
+ "IMESTAMP\020\t\022\010\n\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020" +
+ "\014\022\t\n\005UNION\020\r\022\013\n\007DECIMAL\020\016\"x\n\021StripeInfor" +
+ "mation\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002 " +
+ "\001(\004\022\022\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength\030" +
+ "\004 \001(\004\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMetad" +
+ "ataItem\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n" +
+ "\006Footer\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rcontent",
+ "Length\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apac" +
+ "he.hadoop.hive.ql.io.orc.StripeInformati" +
+ "on\0225\n\005types\030\004 \003(\0132&.org.apache.hadoop.hi" +
+ "ve.ql.io.orc.Type\022D\n\010metadata\030\005 \003(\01322.or" +
+ "g.apache.hadoop.hive.ql.io.orc.UserMetad" +
+ "ataItem\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatist" +
+ "ics\030\007 \003(\01322.org.apache.hadoop.hive.ql.io" +
+ ".orc.ColumnStatistics\022\026\n\016rowIndexStride\030" +
+ "\010 \001(\r\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 \001" +
+ "(\004\022F\n\013compression\030\002 \001(\01621.org.apache.had",
+ "oop.hive.ql.io.orc.CompressionKind\022\034\n\024co" +
+ "mpressionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003(" +
+ "\rB\002\020\001\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKind" +
+ "\022\010\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020" +
+ "\003"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitPackReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitPackReader.java
new file mode 100644
index 0000000..93baf6d
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitPackReader.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+/**
+ * Reader which unpacks the bit packed values.
+ */
+class BitPackReader {
+ // current bit position
+ private int current;
+
+ // bits left in a byte
+ private int bitsLeft;
+ private int numRead;
+
+ // bit packed array
+ private byte[] packed;
+ private int numBits;
+
+ BitPackReader(int numBits) {
+ this.packed = null;
+ this.current = 0;
+ this.bitsLeft = 0;
+ this.numBits = numBits;
+ this.numRead = 0;
+ }
+
+ /**
+ * Reads next byte (as integer) from bit packed array
+ */
+ private void readByte() {
+ current = 0xff & packed[numRead++];
+ bitsLeft = 8;
+ }
+
+ /**
+ * Unpack the bit packed input array till input array's length.
+ *
+ * @param inp
+ * - bit packed array
+ * @return unpacked values. null is returned if input array is null or empty.
+ */
+ long[] unpack(byte[] inp) {
+ if (inp == null || inp.length == 0) {
+ return null;
+ }
+
+ return unpack(inp, inp.length);
+ }
+
+ /**
+ * Unpack the bit packed input array till the specified length.
+ *
+ * @param inp
+ * - bit packed array
+ * @param n
+ * - number of elements in the input array to unpack
+ * @return unpacked values. null for incorrect arguments
+ */
+ long[] unpack(byte[] inp, int n) {
+ if (inp == null || inp.length < 1 || n < 1 || numBits < 1) {
+ return null;
+ }
+
+ this.packed = inp;
+
+ // output unpacked array
+ long[] unpacked = new long[n];
+ if (inp.length > 0) {
+ readByte();
+ }
+
+ for (int i = 0; i < n; i++) {
+ long result = 0;
+ int bitsLeftToRead = numBits;
+ while (bitsLeftToRead > bitsLeft) {
+ result <<= bitsLeft;
+ result |= current & ((1 << bitsLeft) - 1);
+ bitsLeftToRead -= bitsLeft;
+ readByte();
+ }
+
+ // handle the left over bits
+ if (bitsLeftToRead > 0) {
+ result <<= bitsLeftToRead;
+ bitsLeft -= bitsLeftToRead;
+ result |= (current >> bitsLeft) & ((1 << bitsLeftToRead) - 1);
+ }
+
+ unpacked[i] = result;
+ }
+ return unpacked;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitPackWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitPackWriter.java
new file mode 100644
index 0000000..24bc828
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitPackWriter.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+/**
+ * Writer which bit packs integer values.
+ */
+
+class BitPackWriter {
+ private byte[] packed;
+ private int numPacked;
+ private byte current;
+ private int bitsLeft;
+ private int numBits;
+
+ BitPackWriter(int numBits) {
+ this.packed = null;
+ this.numPacked = 0;
+ this.current = 0;
+ this.bitsLeft = 8;
+ this.numBits = numBits;
+ }
+
+ /**
+ * If there are any left over bits then flush the final byte
+ */
+ private void flush() {
+ if (bitsLeft != 8) {
+ writeByte();
+ }
+ }
+
+ private void writeByte() {
+ packed[numPacked++] = current;
+ current = 0;
+ bitsLeft = 8;
+ }
+
+ /**
+ * Bit packs the input array for array length of values
+ *
+ * @param inp
+ * - input array
+ * @param n
+ * - number of elements in the array to bit pack
+ * @return bit packed byte array, null returned for any illegal argument
+ */
+ byte[] pack(long[] inp) {
+ if (inp == null || inp.length == 0 || numBits < 1) {
+ return null;
+ }
+
+ return pack(inp, inp.length);
+ }
+
+ /**
+ * Bit packs the input array for specified length
+ *
+ * @param inp
+ * - input array
+ * @param n
+ * - number of elements in the array to bit pack
+ * @return bit packed byte array, null returned for any illegal argument
+ */
+ byte[] pack(long[] inp, int n) {
+ if (inp == null || n < 1 || inp.length == 0 || numBits < 1) {
+ return null;
+ }
+
+ int totalBytes = Utils.getTotalBytesRequired(n, numBits);
+ packed = new byte[totalBytes];
+
+ for (int i = 0; i < n; i++) {
+ long value = inp[i];
+ int bitsToWrite = numBits;
+ while (bitsToWrite > bitsLeft) {
+ // add the bits to the bottom of the current word
+ current |= value >>> (bitsToWrite - bitsLeft);
+ // subtract out the bits we just added
+ bitsToWrite -= bitsLeft;
+ // zero out the bits above bitsToWrite
+ value &= (1L << bitsToWrite) - 1;
+ writeByte();
+ }
+ bitsLeft -= bitsToWrite;
+ current |= value << bitsLeft;
+ if (bitsLeft == 0) {
+ writeByte();
+ }
+ }
+
+ // flush the left over bytes
+ flush();
+ return packed;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/FixedBitSizes.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/FixedBitSizes.java
new file mode 100644
index 0000000..242f4fb
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/FixedBitSizes.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+public enum FixedBitSizes {
+ ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE, THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN, TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX, TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR;
+
+ /**
+ * For a given fixed bit this function returns the corresponding ordinal
+ * @param n
+ * @return ordinal value
+ */
+ public static int fixedBitsToOrdinal(int n) {
+ if (n == 0) {
+ return ONE.ordinal();
+ }
+
+ if (n >= 1 && n <= 24) {
+ return n - 1;
+ } else if (n > 24 && n <= 26) {
+ return TWENTYSIX.ordinal();
+ } else if (n > 26 && n <= 28) {
+ return TWENTYEIGHT.ordinal();
+ } else if (n > 28 && n <= 30) {
+ return THIRTY.ordinal();
+ } else if (n > 30 && n <= 32) {
+ return THIRTYTWO.ordinal();
+ } else if (n > 32 && n <= 40) {
+ return FORTY.ordinal();
+ } else if (n > 40 && n <= 48) {
+ return FORTYEIGHT.ordinal();
+ } else if (n > 48 && n <= 56) {
+ return FIFTYSIX.ordinal();
+ } else {
+ return SIXTYFOUR.ordinal();
+ }
+ }
+
+ /**
+ * For a given ordinal this function returns the corresponding fixed bits
+ * @param n
+ * @return fixed bit value
+ */
+ public static int ordinalToFixedBits(int n) {
+
+ if (n >= ONE.ordinal() && n <= TWENTYFOUR.ordinal()) {
+ return n + 1;
+ } else if (n == TWENTYSIX.ordinal()) {
+ return 26;
+ } else if (n == TWENTYEIGHT.ordinal()) {
+ return 28;
+ } else if (n == THIRTY.ordinal()) {
+ return 30;
+ } else if (n == THIRTYTWO.ordinal()) {
+ return 32;
+ } else if (n == FORTY.ordinal()) {
+ return 40;
+ } else if (n == FORTYEIGHT.ordinal()) {
+ return 48;
+ } else if (n == FIFTYSIX.ordinal()) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
+ /**
+ * For a given fixed bit this function will return the closest available fixed
+ * bit
+ * @param n
+ * @return closest valid fixed bit
+ */
+ public static int getClosestFixedBits(int n) {
+ if (n == 0) {
+ return 1;
+ }
+
+ if (n >= 1 && n <= 24) {
+ return n;
+ } else if (n > 24 && n <= 26) {
+ return 26;
+ } else if (n > 26 && n <= 28) {
+ return 28;
+ } else if (n > 28 && n <= 30) {
+ return 30;
+ } else if (n > 30 && n <= 32) {
+ return 32;
+ } else if (n > 32 && n <= 40) {
+ return 40;
+ } else if (n > 40 && n <= 48) {
+ return 48;
+ } else if (n > 48 && n <= 56) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerCompressionReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerCompressionReader.java
new file mode 100644
index 0000000..f68d4a7
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerCompressionReader.java
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.io.orc.IntegerCompressionWriter.EncodingType;
+
+/**
+ * A reader that reads a sequence of light weight compressed integers. Refer
+ * {@link IntegerCompressionWriter} for description of various lightweight
+ * compression techniques.
+ */
+class IntegerCompressionReader {
+ private final InStream input;
+ private final boolean signed;
+ private final long[] literals = new long[IntegerCompressionWriter.MAX_SCOPE];
+ private int numLiterals = 0;
+ private int used = 0;
+
+ IntegerCompressionReader(InStream input, boolean signed) throws IOException {
+ this.input = input;
+ this.signed = signed;
+ }
+
+ private void readValues() throws IOException {
+ // read the first 2 bits and determine the encoding type
+ int firstByte = input.read();
+ if (firstByte < 0) {
+ throw new EOFException("Read past end of RLE integer from " + input);
+ } else {
+ int enc = (firstByte >>> 6) & 0x03;
+ if (EncodingType.SHORT_REPEAT.ordinal() == enc) {
+ readShortRepeatValues(firstByte);
+ } else if (EncodingType.DIRECT.ordinal() == enc) {
+ readDirectValues(firstByte);
+ } else if (EncodingType.PATCHED_BASE.ordinal() == enc) {
+ readPatchedBaseValues(firstByte);
+ } else {
+ readDeltaValues(firstByte);
+ }
+ }
+ }
+
+ private void readDeltaValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fb = (firstByte >>> 1) & 0x1f;
+ if (fb != 0) {
+ fb = FixedBitSizes.ordinalToFixedBits(fb);
+ }
+
+ // extract the blob run length
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+
+ // read the first value stored as vint
+ long firstVal = 0;
+ if (signed) {
+ firstVal = SerializationUtils.readVslong(input);
+ } else {
+ firstVal = SerializationUtils.readVulong(input);
+ }
+
+ // store first value to result buffer
+ long prevVal = firstVal;
+ literals[numLiterals++] = firstVal;
+
+ // if fixed bits is 0 then all values have fixed delta
+ if (fb == 0) {
+ // read the fixed delta value stored as vint (deltas can be negative even
+ // if all number are positive)
+ long fd = SerializationUtils.readVslong(input);
+
+ // add fixed deltas to adjacent values
+ for (int i = 0; i < len; i++) {
+ literals[numLiterals++] = literals[numLiterals - 2] + fd;
+ }
+ } else {
+ // number of bytes of data blob
+ int bytesToRead = Utils.getTotalBytesRequired(len, fb);
+
+ // create buffer space to fill up the blob and unpack the bit packed
+ // values
+ byte[] buffer = new byte[bytesToRead];
+ for (int i = 0; i < bytesToRead; i++) {
+ buffer[i] = (byte) input.read();
+ }
+
+ // write the unpacked values, zigzag decode the delta values, add
+ // it to previous value and store final value to result buffer
+ BitPackReader bpr = new BitPackReader(fb);
+ long[] adjDeltas = bpr.unpack(buffer, len);
+ for (int i = 0; i < adjDeltas.length; i++) {
+ long zzDelta = Utils.zigzagDecode(adjDeltas[i]);
+ literals[numLiterals++] = prevVal + zzDelta;
+ prevVal = literals[numLiterals - 1];
+ }
+ }
+
+ }
+
+ private void readPatchedBaseValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fbo = (firstByte >>> 1) & 0x1f;
+ int fb = FixedBitSizes.ordinalToFixedBits(fbo);
+
+ // extract the run length of data blob
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+ // runs are always one off
+ len += 1;
+
+ // extract the number of bytes occupied by base
+ int thirdByte = input.read();
+ int bw = (thirdByte >>> 5) & 0x07;
+ // base width is one off
+ bw += 1;
+
+ // extract patch width
+ int pwo = thirdByte & 0x1f;
+ int pw = FixedBitSizes.ordinalToFixedBits(pwo);
+
+ // read fourth byte and extract patch gap width
+ int fourthByte = input.read();
+ int pgw = (fourthByte >>> 5) & 0x07;
+ // patch gap width is one off
+ pgw += 1;
+
+ // extract the length of the patch list
+ int pl = fourthByte & 0x1f;
+
+ // read the next base width number of bytes to extract base value
+ byte[] buffer = new byte[bw];
+ for (int i = 0; i < bw; i++) {
+ buffer[i] = (byte) input.read();
+ }
+
+ long base = Utils.bytesToLongBE(buffer);
+
+ // calculate the total bytes of data blob and patch blob
+ // data blob length = fixed width bits (fb) * run length (len)
+ // patch blob length = patch length * (patch width + patch gap width)
+ int blobBytes = Utils.getTotalBytesRequired(len, fb);
+ int patchBlobBytes = Utils.getTotalBytesRequired(pl, pw + pgw);
+
+ // create buffer space and unpack the bit packed data blob
+ buffer = new byte[blobBytes];
+ for (int i = 0; i < blobBytes; i++) {
+ buffer[i] = (byte) input.read();
+ }
+
+ // create buffer space and unpack the bit packed patch blob
+ byte[] patchBuffer = new byte[patchBlobBytes];
+ for (int i = 0; i < patchBlobBytes; i++) {
+ patchBuffer[i] = (byte) input.read();
+ }
+
+ // unpack the patch blob
+ BitPackReader bpr = new BitPackReader(pw + pgw);
+ long[] unpackedPatch = bpr.unpack(patchBuffer, pl);
+
+ // apply the patch directly when decoding the packed data
+ int patchIdx = 0;
+ long currGap = 0;
+ long currPatch = 0;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ long actualGap = 0;
+
+ // special case: gap is >255 then patch value will be 0.
+ // if gap is <=255 then patch value cannot be 0
+ while (currGap == 255 && currPatch == 0) {
+ actualGap += 255;
+ patchIdx++;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ }
+ // add the left over gap
+ actualGap += currGap;
+
+ // unpack data blob, patch it (if required), add base and the zigzag
+ // decode the value to get final result
+ bpr = new BitPackReader(fb);
+ long[] unpacked = bpr.unpack(buffer, len);
+ for (int i = 0; i < unpacked.length; i++) {
+ if (i == actualGap) {
+ // apply patch
+ long patchedVal = unpacked[i] | (currPatch << fb);
+
+ // add base to patched value and zigzag decode it
+ if (signed) {
+ literals[numLiterals++] = Utils.zigzagDecode(base + patchedVal);
+ } else {
+ literals[numLiterals++] = base + patchedVal;
+ }
+
+ // increment the patch to point to next entry in patch list
+ patchIdx++;
+
+ if (patchIdx < pl) {
+ // read the next gap and patch
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ actualGap = 0;
+
+ // special case: gap is >255 then patch will be 0. if gap is
+ // <=255 then patch cannot be 0
+ while (currGap == 255 && currPatch == 0) {
+ actualGap += 255;
+ patchIdx++;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ }
+ // add the left over gap
+ actualGap += currGap;
+
+ // next gap is relative to the current gap
+ actualGap += i;
+ }
+ } else {
+ // no patching required. add base to unpacked value and zigzag
+ // decode to get final value
+ if (signed) {
+ literals[numLiterals++] = Utils.zigzagDecode(base + unpacked[i]);
+ } else {
+ literals[numLiterals++] = base + unpacked[i];
+ }
+ }
+ }
+
+ }
+
+ private void readDirectValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fbo = (firstByte >>> 1) & 0x1f;
+ int fb = FixedBitSizes.ordinalToFixedBits(fbo);
+
+ // extract the run length
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+ // runs are one off
+ len += 1;
+
+ // read the bit packed data blob
+ int bytesToRead = Utils.getTotalBytesRequired(len, fb);
+
+ // create buffer space to fill up the blob and unpack the values
+ byte[] buffer = new byte[bytesToRead];
+ for (int i = 0; i < bytesToRead; i++) {
+ buffer[i] = (byte) input.read();
+ }
+
+ // write the unpacked values and zigzag decode to result buffer
+ BitPackReader bpr = new BitPackReader(fb);
+ long[] unpacked = bpr.unpack(buffer, len);
+ if (signed) {
+ for (int i = 0; i < unpacked.length; i++) {
+ literals[numLiterals++] = Utils.zigzagDecode(unpacked[i]);
+ }
+ } else {
+ for (int i = 0; i < unpacked.length; i++) {
+ literals[numLiterals++] = unpacked[i];
+ }
+ }
+ }
+
+ private void readShortRepeatValues(int firstByte) throws IOException {
+
+ // read the number of bytes occupied by the value
+ int size = (firstByte >>> 3) & 0x07;
+ // #bytes are one off
+ size += 1;
+
+ // read the run length
+ int len = firstByte & 0x07;
+ // run lengths values are stored only after MIN_REPEAT value is met
+ len += IntegerCompressionWriter.MIN_REPEAT;
+
+ // read the repeated value which is store using fixed bytes
+ byte[] buffer = new byte[size];
+ for (int i = 0; i < size; i++) {
+ buffer[i] = (byte) input.read();
+ }
+
+ long val = Utils.bytesToLongBE(buffer);
+ if (signed) {
+ val = Utils.zigzagDecode(val);
+ }
+
+ // repeat the value for length times
+ for (int i = 0; i < len; i++) {
+ literals[numLiterals++] = val;
+ }
+ }
+
+ boolean hasNext() throws IOException {
+ return used != numLiterals || input.available() > 0;
+ }
+
+ long next() throws IOException {
+ long result;
+ if (used == numLiterals) {
+ numLiterals = 0;
+ used = 0;
+ readValues();
+ }
+ result = literals[used++];
+ return result;
+ }
+
+ void seek(PositionProvider index) throws IOException {
+ input.seek(index);
+ int consumed = (int) index.getNext();
+ if (consumed != 0) {
+ // a loop is required for cases where we break the run into two
+ // parts
+ while (consumed > 0) {
+ numLiterals = 0;
+ readValues();
+ used = consumed;
+ consumed -= numLiterals;
+ }
+ } else {
+ used = 0;
+ numLiterals = 0;
+ }
+ }
+
+ void skip(long numValues) throws IOException {
+ while (numValues > 0) {
+ if (used == numLiterals) {
+ numLiterals = 0;
+ used = 0;
+ readValues();
+ }
+ long consume = Math.min(numValues, numLiterals - used);
+ used += consume;
+ numValues -= consume;
+ }
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerCompressionWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerCompressionWriter.java
new file mode 100644
index 0000000..0466f74
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerCompressionWriter.java
@@ -0,0 +1,857 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * A writer that performs light weight compression over sequence of integers.
+ *
+ * There are four types of lightweight integer compression
+ *
+ * SHORT_REPEAT
+ * DIRECT
+ * PATCHED_BASE
+ * DELTA
+ *
+ *
+ * The description and format for these types are as below:
+ *
+ * SHORT_REPEAT: Used for short repeated integer sequences.
+ *
+ * 1 byte header
+ *
+ * 2 bits for encoding type
+ * 3 bits for bytes required for repeating value
+ * 3 bits for repeat count (MIN_REPEAT + run length)
+ *
+ *
+ * Blob - repeat value (fixed bytes)
+ *
+ *
+ *
+ * DIRECT: Used for random integer sequences whose number of bit requirement doesn't vary a
+ * lot.
+ *
+ * 2 bytes header
+ *
+ * 1st byte
+ * 2 bits for encoding type
+ * 5 bits for fixed bit width of values in blob
+ * 1 bit for storing MSB of run length
+ *
+ *
+ * 2nd byte
+ * 8 bits for lower run length bits
+ *
+ *
+ * Blob - fixed width * run length bits long
+ *
+ *
+ *
+ * PATCHED_BASE: Used for random integer sequences whose number of bit requirement varies
+ * beyond a threshold.
+ *
+ * 4 bytes header
+ *
+ * 1st byte
+ * 2 bits for encoding type
+ * 5 bits for fixed bit width of values in blob
+ * 1 bit for storing MSB of run length
+ *
+ *
+ * 2nd byte
+ * 8 bits for lower run length bits
+ *
+ *
+ * 3rd byte
+ * 3 bits for bytes required for base value
+ * 5 bits for patch width
+ *
+ *
+ * 4th byte
+ * 3 bits for patch gap width
+ * 5 bits for patch length
+ *
+ *
+ * Base value - base width * 8 bits
+ * Data blob - fixed width * run length
+ * Patch blob - (patch width + patch gap width) * patch length
+ *
+ *
+ *
+ * DELTA Used for monotonically increasing or decreasing sequences, sequences with fixed
+ * delta values or long repeated sequences.
+ *
+ * 2 bytes header
+ *
+ * 1st byte
+ * 2 bits for encoding type
+ * 5 bits for fixed bit width of values in blob
+ * 1 bit for storing MSB of run length
+ *
+ *
+ * 2nd byte
+ * 8 bits for lower run length bits
+ *
+ *
+ * Base value - encoded as varint
+ * Delta base (only long fixed delta runs) - zigzag encoded
+ * Delta blob (variable delta runs) - zigzag encoded
+ *
+ *
+ */
+class IntegerCompressionWriter {
+
+ public enum EncodingType {
+ SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA
+ }
+
+ static final int MAX_SCOPE = 512;
+ static final int MIN_REPEAT = 3;
+ private static final int MAX_SHORT_REPEAT_LENGTH = 10;
+ private static final int MAX_SHORT_REPEAT_SIZE = 8;
+ private long prevDelta = 0;
+ private int fixedRunLength = 0;
+ private int variableRunLength = 0;
+ private long[] literals = new long[MAX_SCOPE];
+ private final PositionedOutputStream output;
+ private final boolean signed;
+ private EncodingType encoding = null;
+ private int numLiterals = 0;
+ private long min = 0;
+ private long max = 0;
+ private long[] zigzagLiterals = null;
+ private long[] zzBaseReduced = null;
+ private long[] zzAdjDeltas = null;
+ private long fixedDelta = 0;
+ private int zzBits90p = 0;
+ private int zzBits100p = 0;
+ private int zzBRBits95p = 0;
+ private int zzBRBits100p = 0;
+ private int bitsDeltaMax = 0;
+ private int patchWidth = 0;
+ private int patchGapWidth = 0;
+ private int patchLength = 0;
+ private long[] gapVsPatchList = null;
+ private long zzMin = 0;
+ private boolean isFixedDelta = false;
+
+ IntegerCompressionWriter(PositionedOutputStream output, boolean signed) {
+ this.output = output;
+ this.signed = signed;
+ }
+
+ private void writeValues() throws IOException {
+ if (numLiterals != 0) {
+
+ if (encoding.equals(EncodingType.SHORT_REPEAT)) {
+ writeShortRepeatValues();
+ } else if (encoding.equals(EncodingType.DIRECT)) {
+ writeDirectValues();
+ } else if (encoding.equals(EncodingType.PATCHED_BASE)) {
+ writePatchedBaseValues();
+ } else {
+ writeDeltaValues();
+ }
+
+ // clear all the variables
+ clear();
+ }
+ }
+
+ private void writeDeltaValues() throws IOException {
+ // write encoding type in top 2 bits
+ int enc = encoding.ordinal() << 6;
+
+ int len = 0;
+ int fb = bitsDeltaMax;
+ int cfb = 0;
+ int fbo = 0;
+
+ if (isFixedDelta) {
+ // if the fixed delta is 0 then the sequence is counted as fixed
+ // run length else as variable run length
+ if (fixedRunLength > MIN_REPEAT) {
+ // ex. sequence: 2 2 2 2 2 2 2 2
+ len = fixedRunLength - 1;
+ fixedRunLength = 0;
+ } else {
+ // ex. sequence: 4 6 8 10 12 14 16
+ len = variableRunLength - 1;
+ variableRunLength = 0;
+ }
+ } else {
+ cfb = FixedBitSizes.getClosestFixedBits(fb);
+ // fixed width 0 is used for fixed delta runs. sequence that incur
+ // only 1 bit to encode will have an additional bit
+ if (cfb == 1) {
+ cfb = 2;
+ }
+ fbo = FixedBitSizes.fixedBitsToOrdinal(cfb) << 1;
+ len = variableRunLength - 1;
+ variableRunLength = 0;
+ }
+
+ // extract the 9th bit of run length
+ int tailBits = (len & 0x100) >>> 8;
+
+ // create first byte of the header
+ int headerFirstByte = enc | fbo | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ int headerSecondByte = len & 0xff;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+
+ // store the first value from zigzag literal array
+ if (signed) {
+ SerializationUtils.writeVslong(output, literals[0]);
+ } else {
+ SerializationUtils.writeVulong(output, literals[0]);
+ }
+
+ // if delta is fixed then we don't need to store delta blob else store
+ // delta blob using fixed bit packing
+ if (isFixedDelta) {
+ // store the fixed delta using zigzag encoding. deltas can be negative
+ // even if all values are positive
+ SerializationUtils.writeVslong(output, fixedDelta);
+ } else {
+ // bit pack the delta blob
+ BitPackWriter bpw = new BitPackWriter(cfb);
+ byte[] packed = bpw.pack(zzAdjDeltas, zzAdjDeltas.length);
+ for (int i = 0; i < packed.length; i++) {
+ output.write(packed[i]);
+ }
+ }
+ }
+
+ private void writePatchedBaseValues() throws IOException {
+ // write encoding type in top 2 bits
+ int enc = encoding.ordinal() << 6;
+
+ // write the number of fixed bits required in next 5 bits
+ int fb = zzBRBits95p;
+ int fbc = FixedBitSizes.getClosestFixedBits(fb);
+ int fbo = FixedBitSizes.fixedBitsToOrdinal(fbc) << 1;
+
+ // adjust variable run length, they are one off
+ variableRunLength -= 1;
+
+ // extract the 9th bit of run length
+ int tailBits = (variableRunLength & 0x100) >>> 8;
+
+ // create first byte of the header
+ int headerFirstByte = enc | fbo | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ int headerSecondByte = variableRunLength & 0xff;
+
+ // find the number of bytes required for base and shift it by 5 bits
+ // to accommodate patch width
+ int baseWidth = Utils.findNumBits(zzMin);
+ baseWidth = FixedBitSizes.getClosestFixedBits(baseWidth);
+ int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1;
+ int bb = (baseBytes - 1) << 5;
+
+ // third byte contains 3 bits for number of bytes occupied by base
+ // and 5 bits for patchWidth
+ int headerThirdByte = bb | FixedBitSizes.fixedBitsToOrdinal(patchWidth);
+
+ // fourth byte contains 3 bits for page gap width and 5 bits for
+ // patch length
+ int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+ output.write(headerThirdByte);
+ output.write(headerFourthByte);
+
+ // write the base value using fixed bytes in big endian order
+ for (int i = baseBytes - 1; i >= 0; i--) {
+ byte b = (byte) ((zzMin >>> (i * 8)) & 0xff);
+ output.write(b);
+ }
+
+ // bit packing the delta values and write each bytes
+ int closestFixedBits = FixedBitSizes.getClosestFixedBits(zzBRBits95p);
+ BitPackWriter bpw = new BitPackWriter(closestFixedBits);
+ byte[] packed = bpw.pack(zzBaseReduced, numLiterals);
+ for (int i = 0; i < packed.length; i++) {
+ output.write(packed[i]);
+ }
+
+ // write the patch blob
+ closestFixedBits = FixedBitSizes.getClosestFixedBits(patchGapWidth
+ + patchWidth);
+ bpw = new BitPackWriter(closestFixedBits);
+ packed = bpw.pack(gapVsPatchList, patchLength);
+ for (int i = 0; i < packed.length; i++) {
+ output.write(packed[i]);
+ }
+
+ // reset run length
+ variableRunLength = 0;
+ }
+
+ private void writeDirectValues() throws IOException {
+ // write encoding type in top 2 bits
+ int enc = encoding.ordinal() << 6;
+
+ // write the number of fixed bits required in next 5 bits
+ int cfb = zzBits100p;
+ int fbo = FixedBitSizes.fixedBitsToOrdinal(cfb) << 1;
+
+ // adjust variable run length
+ variableRunLength -= 1;
+
+ // extract the 9th bit of run length
+ int tailBits = (variableRunLength & 0x100) >>> 8;
+
+ // create first byte of the header
+ int headerFirstByte = enc | fbo | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of
+ // runlength
+ int headerSecondByte = variableRunLength & 0xff;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+
+ // bit packing the delta values and write each bytes
+ BitPackWriter bpw = new BitPackWriter(cfb);
+ byte[] packed = bpw.pack(zigzagLiterals, zigzagLiterals.length);
+ for (int i = 0; i < packed.length; i++) {
+ output.write(packed[i]);
+ }
+
+ // reset run length
+ variableRunLength = 0;
+ }
+
+ private void writeShortRepeatValues() throws IOException {
+ // System.out.println("SHORT_REPEAT: " + numLiterals + " Literals: "
+ // + Longs.asList(Arrays.copyOf(literals, numLiterals)));
+
+ // get the value that is repeating, compute the bits and bytes required
+ long repeatVal = 0;
+ if (signed) {
+ repeatVal = Utils.zigzagEncode(literals[0]);
+ } else {
+ repeatVal = literals[0];
+ }
+
+ int numBitsRepeatVal = Utils.findNumBits(repeatVal);
+ numBitsRepeatVal = FixedBitSizes.getClosestFixedBits(numBitsRepeatVal);
+ int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3
+ : (numBitsRepeatVal >>> 3) + 1;
+
+ // if the runs are long or too short and if the delta is non zero, then
+ // choose a different algorithm
+ if (fixedRunLength >= MIN_REPEAT
+ && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH
+ && numBytesRepeatVal <= MAX_SHORT_REPEAT_SIZE && prevDelta == 0) {
+ // write encoding type in top 2 bits
+ int header = encoding.ordinal() << 6;
+
+ // write the number of bytes required for the value
+ header |= ((numBytesRepeatVal - 1) << 3);
+
+ // write the run length
+ fixedRunLength -= MIN_REPEAT;
+ header |= fixedRunLength;
+
+ // write the header
+ output.write(header);
+
+ // write the payload (i.e. the repeat value) in big endian
+ for (int i = numBytesRepeatVal - 1; i >= 0; i--) {
+ int b = (int) ((repeatVal >>> (i * 8)) & 0xff);
+ output.write(b);
+ }
+
+ fixedRunLength = 0;
+ } else {
+ determineEncoding();
+ writeValues();
+ }
+ }
+
+ private void determineEncoding() {
+ // used for direct encoding
+ zigzagLiterals = new long[numLiterals];
+
+ // used for patched base encoding
+ zzBaseReduced = new long[numLiterals];
+
+ // used for delta encoding
+ zzAdjDeltas = new long[numLiterals - 1];
+
+ int idx = 0;
+
+ // for identifying monotonic sequences
+ boolean isIncreasing = false;
+ int increasingCount = 1;
+ boolean isDecreasing = false;
+ int decreasingCount = 1;
+
+ // for identifying type of delta encoding
+ long currMin = literals[0];
+ long currMax = literals[0];
+ isFixedDelta = true;
+ long currDelta = 0;
+
+ if (signed) {
+ zzMin = Utils.zigzagEncode(literals[0]);
+ } else {
+ zzMin = literals[0];
+ }
+ long deltaMax = 0;
+
+ // populate all variables to identify the encoding type
+ if (numLiterals >= 1) {
+ currDelta = literals[1] - literals[0];
+ for (int i = 0; i < numLiterals; i++) {
+ if (i > 0 && literals[i] >= currMax) {
+ currMax = literals[i];
+ increasingCount++;
+ }
+
+ if (i > 0 && literals[i] <= currMin) {
+ currMin = literals[i];
+ decreasingCount++;
+ }
+
+ // if delta doesn't changes then mark it as fixed delta
+ if (i > 0 && isFixedDelta) {
+ if (literals[i] - literals[i - 1] != currDelta) {
+ isFixedDelta = false;
+ }
+
+ fixedDelta = currDelta;
+ }
+
+ // store the minimum value among zigzag encoded values. The min
+ // value (base) will be removed in patched base encoding
+ long zzEncVal = 0;
+ if (signed) {
+ zzEncVal = Utils.zigzagEncode(literals[i]);
+ } else {
+ zzEncVal = literals[i];
+ }
+ if (zzEncVal < zzMin) {
+ zzMin = zzEncVal;
+ }
+ zigzagLiterals[idx] = zzEncVal;
+ idx++;
+
+ // max delta value is required for computing the fixed bits
+ // required for delta blob in delta encoding
+ if (i > 0) {
+ zzAdjDeltas[i - 1] = Utils.zigzagEncode(literals[i] - literals[i - 1]);
+ if (zzAdjDeltas[i - 1] > deltaMax) {
+ deltaMax = zzAdjDeltas[i - 1];
+ }
+ }
+ }
+
+ // stores the number of bits required for packing delta blob in
+ // delta encoding
+ bitsDeltaMax = Utils.findNumBits(deltaMax);
+
+ // if decreasing count equals total number of literals then the
+ // sequence is monotonically decreasing
+ if (increasingCount == 1 && decreasingCount == numLiterals) {
+ isDecreasing = true;
+ }
+
+ // if increasing count equals total number of literals then the
+ // sequence is monotonically increasing
+ if (decreasingCount == 1 && increasingCount == numLiterals) {
+ isIncreasing = true;
+ }
+ }
+
+ // if the sequence is both increasing and decreasing then it is not
+ // monotonic
+ if (isDecreasing && isIncreasing) {
+ isDecreasing = false;
+ isIncreasing = false;
+ }
+
+ // percentile values are computed for the zigzag encoded values. if the
+ // number of bit requirement between 90th and 100th percentile varies
+ // beyond a threshold then we need to patch the values. if the variation
+ // is not significant then we can use direct or delta encoding
+
+ // percentile is called multiple times and so we will sort the array
+ // once and reuse it
+ long[] sortedZigzag = Arrays.copyOf(zigzagLiterals, zigzagLiterals.length);
+ Arrays.sort(sortedZigzag);
+ double p = 0.9;
+ long p90Val = (long) Utils.percentile(sortedZigzag, p, true);
+ zzBits90p = Utils.findNumBits(p90Val);
+ zzBits90p = FixedBitSizes.getClosestFixedBits(zzBits90p);
+
+ p = 1.0;
+ long p100Val = (long) Utils.percentile(sortedZigzag, p, true);
+ zzBits100p = Utils.findNumBits(p100Val);
+ zzBits100p = FixedBitSizes.getClosestFixedBits(zzBits100p);
+
+ int diffBitsLH = zzBits100p - zzBits90p;
+
+ // if the difference between 90th percentile and 100th percentile fixed
+ // bits is > 1 then we need patch the values
+ if (isIncreasing == false && isDecreasing == false && diffBitsLH > 1
+ && isFixedDelta == false) {
+ // patching is done only on base reduces values.
+ // remove base from the zigzag encoded values
+ for (int i = 0; i < zigzagLiterals.length; i++) {
+ zzBaseReduced[i] = zigzagLiterals[i] - zzMin;
+ }
+
+ // percentile is computed multiple times, so sort and reuse
+ long[] sortedZZBaseReduced = Arrays.copyOf(zzBaseReduced,
+ zzBaseReduced.length);
+ Arrays.sort(sortedZZBaseReduced);
+
+ // 95th percentile width is used to determine max allowed value
+ // after which patching will be done
+ p = 0.95;
+ long p95Val = (long) Utils.percentile(sortedZZBaseReduced, p, true);
+ zzBRBits95p = Utils.findNumBits(p95Val);
+ zzBRBits95p = FixedBitSizes.getClosestFixedBits(zzBRBits95p);
+
+ // 100th percentile is used to compute the max patch width
+ p = 1.0;
+ p100Val = (long) Utils.percentile(sortedZZBaseReduced, p, true);
+ zzBRBits100p = Utils.findNumBits(p100Val);
+ zzBRBits100p = FixedBitSizes.getClosestFixedBits(zzBRBits100p);
+
+ // after base reducing the values, if the difference in bits between
+ // 95th percentile and 100th percentile value is zero then there
+ // is no point in patching the values, in which case we will
+ // fallback to DIRECT encoding. The reason for this is max value
+ // (beyond which patching is done) is based on 95th percentile
+ // base reduced value.
+ if ((zzBRBits100p - zzBRBits95p) != 0) {
+ encoding = EncodingType.PATCHED_BASE;
+ preparePatchedBlob();
+ return;
+ } else {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+ }
+
+ // if difference in bits between 95th percentile and 100th percentile is
+ // 0, then patch length will become 0. Hence we will fallback to direct
+ if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1
+ && isFixedDelta == false) {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+
+ if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1
+ && isFixedDelta == true) {
+ encoding = EncodingType.DELTA;
+ return;
+ }
+
+ if (isIncreasing || isDecreasing) {
+ encoding = EncodingType.DELTA;
+ return;
+ }
+
+ if (encoding == null) {
+ throw new RuntimeException("Integer encoding cannot be determined.");
+ }
+ }
+
+ private void preparePatchedBlob() {
+ // mask will be max value beyond which patch will be generated
+ int mask = (1 << zzBRBits95p) - 1;
+
+ // since we are considering only 95 percentile, the size of gap and
+ // patch array can contain only be 5% values
+ patchLength = (int) Math.ceil((zzBaseReduced.length * 0.05));
+ int[] gapList = new int[patchLength];
+ long[] patchList = new long[patchLength];
+
+ // #bit for patch
+ patchWidth = zzBRBits100p - zzBRBits95p;
+ patchWidth = FixedBitSizes.getClosestFixedBits(patchWidth);
+
+ int gapIdx = 0;
+ int patchIdx = 0;
+ int prev = 0;
+ int gap = 0;
+ int maxGap = 0;
+
+ for (int i = 0; i < zzBaseReduced.length; i++) {
+ // if value is above mask then create the patch and record the gap
+ if (zzBaseReduced[i] > mask) {
+ gap = i - prev;
+ if (gap > maxGap) {
+ maxGap = gap;
+ }
+
+ // gaps are relative, so store the previous patched value
+ prev = i;
+ gapList[gapIdx++] = gap;
+
+ // extract the most significant bits that are over mask
+ long patch = zzBaseReduced[i] >>> zzBRBits95p;
+ patchList[patchIdx++] = patch;
+
+ // strip off the MSB to enable safe bit packing
+ zzBaseReduced[i] &= mask;
+ }
+ }
+
+ // adjust the patch length to number of entries in gap list
+ patchLength = gapIdx;
+
+ // if the element to be patched is the first and only element then
+ // max gap will be 0, but to store the gap as 0 we need atleast 1 bit
+ if (maxGap == 0 && patchLength != 0) {
+ patchGapWidth = 1;
+ } else {
+ patchGapWidth = Utils.findNumBits(maxGap);
+ }
+
+ // special case: if the patch gap width is greater than 256, then
+ // we need 9 bits to encode the gap width. But we only have 3 bits in
+ // header to record the gap width. To deal with this case, we will save
+ // two entries in final patch list with following entries
+ // 256 gap width => 0 for patch value
+ // actual gap - 256 => actual patch value
+ if (patchGapWidth > 8) {
+ patchGapWidth = 8;
+ // for gap = 511, we need two additional entries in patch list
+ if (maxGap == 511) {
+ patchLength += 2;
+ } else {
+ patchLength += 1;
+ }
+ }
+
+ // create gap vs patch list
+ gapIdx = 0;
+ patchIdx = 0;
+ gapVsPatchList = new long[patchLength];
+ for (int i = 0; i < patchLength; i++) {
+ long g = gapList[gapIdx++];
+ long p = patchList[patchIdx++];
+ while (g > 255) {
+ gapVsPatchList[i++] = (255 << patchWidth) | 0;
+ g -= 255;
+ }
+
+ // store patch value in LSBs and gap in MSBs
+ gapVsPatchList[i] = (g << patchWidth) | p;
+ }
+ }
+
+ /**
+ * clears all the variables
+ */
+ private void clear() {
+ numLiterals = 0;
+ min = 0;
+ max = 0;
+ encoding = null;
+ prevDelta = 0;
+ zigzagLiterals = null;
+ zzBaseReduced = null;
+ zzAdjDeltas = null;
+ fixedDelta = 0;
+ zzBits90p = 0;
+ zzBits100p = 0;
+ zzBRBits95p = 0;
+ zzBRBits100p = 0;
+ bitsDeltaMax = 0;
+ patchGapWidth = 0;
+ patchLength = 0;
+ patchWidth = 0;
+ gapVsPatchList = null;
+ zzMin = 0;
+ isFixedDelta = false;
+ }
+
+ void flush() throws IOException {
+ // if only one element is left in buffer then use short repeat encoding
+ if (numLiterals == 1) {
+ encoding = EncodingType.SHORT_REPEAT;
+ fixedRunLength = 1;
+ writeValues();
+ }
+
+ // if variable runs are not 0 then determine the delta encoding to use
+ // and flush out the buffer
+ if (variableRunLength != 0 && numLiterals != 0) {
+ determineEncoding();
+ writeValues();
+ }
+
+ // if fixed runs are not 0 then flush out the buffer
+ if (fixedRunLength != 0 && numLiterals != 0) {
+ // encoding = EncodingType.SHORT_REPEAT;
+ if (fixedRunLength < MIN_REPEAT) {
+ variableRunLength = fixedRunLength;
+ fixedRunLength = 0;
+ determineEncoding();
+ writeValues();
+ } else {
+ encoding = EncodingType.SHORT_REPEAT;
+ writeValues();
+ }
+ }
+
+ output.flush();
+ }
+
+ void write(long val) throws IOException {
+ if (numLiterals == 0) {
+ initializeLiterals(val);
+ } else {
+ // update min and max values
+ if (val <= min) {
+ min = val;
+ }
+
+ if (val >= max) {
+ max = val;
+ }
+
+ if (numLiterals == 1) {
+ prevDelta = val - literals[0];
+ literals[numLiterals++] = val;
+ // if both values are same count as fixed run else variable run
+ if (val == literals[0]) {
+ fixedRunLength = 2;
+ variableRunLength = 0;
+ } else {
+ fixedRunLength = 0;
+ variableRunLength = 2;
+ }
+ } else {
+ long currentDelta = val - literals[numLiterals - 1];
+ if (prevDelta == 0 && currentDelta == 0) {
+ // fixed delta run
+
+ literals[numLiterals++] = val;
+
+ // if variable run is non-zero then we are seeing repeating
+ // values at the end of variable run in which case keep
+ // updating variable and fixed runs
+ if (variableRunLength > 0) {
+ fixedRunLength = 2;
+ }
+ fixedRunLength += 1;
+
+ // if fixed run met the minimum condition and if variable
+ // run is non-zero then flush the variable run and shift the
+ // tail fixed runs to start of the buffer
+ if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) {
+ numLiterals -= MIN_REPEAT;
+ variableRunLength -= MIN_REPEAT - 1;
+ // copy the tail fixed runs
+ long[] tailVals = Arrays.copyOfRange(literals, numLiterals,
+ numLiterals + MIN_REPEAT);
+
+ // determine variable encoding and flush values
+ determineEncoding();
+ writeValues();
+
+ // shift tail fixed runs to beginning of the buffer
+ for (long l : tailVals) {
+ literals[numLiterals++] = l;
+ }
+ }
+
+ // if fixed runs reached max repeat length then write values
+ if (fixedRunLength == MAX_SCOPE) {
+ determineEncoding();
+ writeValues();
+ }
+ } else {
+ // variable delta run
+
+ // if fixed run length is non-zero and if satisfies the
+ // short repeat algorithm conditions then write the values
+ // as short repeats else determine the appropriate algorithm
+ // to persist the values
+ if (fixedRunLength >= MIN_REPEAT) {
+ if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+ encoding = EncodingType.SHORT_REPEAT;
+ writeValues();
+ } else {
+ determineEncoding();
+ writeValues();
+ }
+ }
+
+ // if fixed run length is 0 && fixedRunLength < MIN_REPEAT) {
+ if (val != literals[numLiterals - 1]) {
+ variableRunLength = fixedRunLength;
+ fixedRunLength = 0;
+ }
+ }
+
+ // after writing values re-initialize the variables
+ if (numLiterals == 0) {
+ initializeLiterals(val);
+ } else {
+ // keep updating variable run lengths
+ prevDelta = val - literals[numLiterals - 1];
+ literals[numLiterals++] = val;
+ variableRunLength += 1;
+
+ // if variable run length reach the max scope, write it
+ if (variableRunLength == MAX_SCOPE) {
+ determineEncoding();
+ writeValues();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void initializeLiterals(long val) {
+ literals[numLiterals++] = val;
+ min = val;
+ max = val;
+ fixedRunLength = 1;
+ variableRunLength = 1;
+ }
+
+ void getPosition(PositionRecorder recorder) throws IOException {
+ output.getPosition(recorder);
+ recorder.addPosition(numLiterals);
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 06103e3..3278f92 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -114,14 +114,18 @@ public long getNext() {
protected final int columnId;
private BitFieldReader present = null;
protected boolean valuePresent = false;
+ protected boolean isDirectV2;
TreeReader(Path path, int columnId) {
this.path = path;
this.columnId = columnId;
+ // FIXME: make it user configurable?
+ this.isDirectV2 = true;
}
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
- if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId + " of " + path);
}
@@ -130,6 +134,9 @@ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
void startStripe(Map streams,
List encoding
) throws IOException {
+ if(encoding.get(columnId).getKind() == OrcProto.ColumnEncoding.Kind.DIRECT) {
+ this.isDirectV2 = false;
+ }
checkEncoding(encoding.get(columnId));
InStream in = streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.PRESENT));
@@ -264,6 +271,7 @@ void skipRows(long items) throws IOException {
private static class ShortTreeReader extends TreeReader{
private RunLengthIntegerReader reader = null;
+ private IntegerCompressionReader readerV2 = null;
ShortTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -276,13 +284,21 @@ void startStripe(Map streams,
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ if (super.isDirectV2) {
+ readerV2 = new IntegerCompressionReader(streams.get(name), true);
+ } else {
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
- reader.seek(index[columnId]);
+ if (reader != null) {
+ reader.seek(index[columnId]);
+ } else {
+ readerV2.seek(index[columnId]);
+ }
}
@Override
@@ -295,19 +311,28 @@ Object next(Object previous) throws IOException {
} else {
result = (ShortWritable) previous;
}
- result.set((short) reader.next());
+ if (reader != null) {
+ result.set((short) reader.next());
+ } else {
+ result.set((short) readerV2.next());
+ }
}
return result;
}
@Override
void skipRows(long items) throws IOException {
- reader.skip(countNonNulls(items));
+ if (reader != null) {
+ reader.skip(countNonNulls(items));
+ } else {
+ readerV2.skip(countNonNulls(items));
+ }
}
}
private static class IntTreeReader extends TreeReader{
private RunLengthIntegerReader reader = null;
+ private IntegerCompressionReader readerV2 = null;
IntTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -320,13 +345,21 @@ void startStripe(Map streams,
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ if (isDirectV2) {
+ readerV2 = new IntegerCompressionReader(streams.get(name), true);
+ } else {
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
- reader.seek(index[columnId]);
+ if (reader != null) {
+ reader.seek(index[columnId]);
+ } else {
+ readerV2.seek(index[columnId]);
+ }
}
@Override
@@ -339,19 +372,28 @@ Object next(Object previous) throws IOException {
} else {
result = (IntWritable) previous;
}
- result.set((int) reader.next());
+ if (reader != null) {
+ result.set((int) reader.next());
+ } else {
+ result.set((int) readerV2.next());
+ }
}
return result;
}
@Override
void skipRows(long items) throws IOException {
- reader.skip(countNonNulls(items));
+ if (reader != null) {
+ reader.skip(countNonNulls(items));
+ } else {
+ readerV2.skip(countNonNulls(items));
+ }
}
}
private static class LongTreeReader extends TreeReader{
private RunLengthIntegerReader reader = null;
+ private IntegerCompressionReader readerV2 = null;
LongTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -364,13 +406,21 @@ void startStripe(Map streams,
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ if (isDirectV2) {
+ readerV2 = new IntegerCompressionReader(streams.get(name), true);
+ } else {
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
- reader.seek(index[columnId]);
+ if (reader != null) {
+ reader.seek(index[columnId]);
+ } else {
+ readerV2.seek(index[columnId]);
+ }
}
@Override
@@ -383,14 +433,22 @@ Object next(Object previous) throws IOException {
} else {
result = (LongWritable) previous;
}
- result.set(reader.next());
+ if (reader != null) {
+ result.set(reader.next());
+ } else {
+ result.set(readerV2.next());
+ }
}
return result;
}
@Override
void skipRows(long items) throws IOException {
- reader.skip(countNonNulls(items));
+ if (reader != null) {
+ reader.skip(countNonNulls(items));
+ } else {
+ readerV2.skip(countNonNulls(items));
+ }
}
}
@@ -489,7 +547,8 @@ void skipRows(long items) throws IOException {
private static class BinaryTreeReader extends TreeReader{
private InStream stream;
- private RunLengthIntegerReader lengths;
+ private RunLengthIntegerReader lengths = null;
+ private IntegerCompressionReader lengthsV2 = null;
BinaryTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -503,16 +562,26 @@ void startStripe(Map streams,
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
stream = streams.get(name);
- lengths = new RunLengthIntegerReader(streams.get(new
- StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
- false);
+ if (isDirectV2) {
+ lengthsV2 = new IntegerCompressionReader(streams.get(new
+ StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+ false);
+ } else {
+ lengths = new RunLengthIntegerReader(streams.get(new
+ StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+ false);
+ }
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
stream.seek(index[columnId]);
- lengths.seek(index[columnId]);
+ if (lengths != null) {
+ lengths.seek(index[columnId]);
+ } else {
+ lengthsV2.seek(index[columnId]);
+ }
}
@Override
@@ -525,7 +594,12 @@ Object next(Object previous) throws IOException {
} else {
result = (BytesWritable) previous;
}
- int len = (int) lengths.next();
+ int len = 0;
+ if (lengths != null) {
+ len = (int) lengths.next();
+ } else {
+ len = (int) lengthsV2.next();
+ }
result.setSize(len);
int offset = 0;
while (len > 0) {
@@ -544,16 +618,24 @@ Object next(Object previous) throws IOException {
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long lengthToSkip = 0;
- for(int i=0; i < items; ++i) {
- lengthToSkip += lengths.next();
+ if (lengths != null) {
+ for (int i = 0; i < items; ++i) {
+ lengthToSkip += lengths.next();
+ }
+ } else {
+ for (int i = 0; i < items; ++i) {
+ lengthToSkip += lengthsV2.next();
+ }
}
stream.skip(lengthToSkip);
}
}
private static class TimestampTreeReader extends TreeReader{
- private RunLengthIntegerReader data;
- private RunLengthIntegerReader nanos;
+ private RunLengthIntegerReader data = null;
+ private RunLengthIntegerReader nanos = null;
+ private IntegerCompressionReader dataV2 = null;
+ private IntegerCompressionReader nanosV2 = null;
TimestampTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -564,17 +646,32 @@ void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
- data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.DATA)), true);
- nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.SECONDARY)), false);
+ if (isDirectV2) {
+ dataV2 = new IntegerCompressionReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)), true);
+ nanosV2 = new IntegerCompressionReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.SECONDARY)), false);
+ } else {
+ data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)), true);
+ nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.SECONDARY)), false);
+ }
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
- data.seek(index[columnId]);
- nanos.seek(index[columnId]);
+ if (data != null) {
+ data.seek(index[columnId]);
+ } else {
+ dataV2.seek(index[columnId]);
+ }
+ if (nanos != null) {
+ nanos.seek(index[columnId]);
+ } else {
+ nanosV2.seek(index[columnId]);
+ }
}
@Override
@@ -587,9 +684,21 @@ Object next(Object previous) throws IOException {
} else {
result = (Timestamp) previous;
}
- long millis = (data.next() + WriterImpl.BASE_TIMESTAMP) *
- WriterImpl.MILLIS_PER_SECOND;
- int newNanos = parseNanos(nanos.next());
+ long millis = 0;
+ int newNanos = 0;
+ if (data != null) {
+ millis = (data.next() + WriterImpl.BASE_TIMESTAMP) *
+ WriterImpl.MILLIS_PER_SECOND;
+ } else {
+ millis = (dataV2.next() + WriterImpl.BASE_TIMESTAMP) *
+ WriterImpl.MILLIS_PER_SECOND;
+ }
+
+ if (nanos != null) {
+ newNanos = parseNanos(nanos.next());
+ } else {
+ newNanos = parseNanos(nanosV2.next());
+ }
// fix the rounding when we divided by 1000.
if (millis >= 0) {
millis += newNanos / 1000000;
@@ -616,14 +725,23 @@ private static int parseNanos(long serialized) {
@Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
- data.skip(items);
- nanos.skip(items);
+ if (data != null) {
+ data.skip(items);
+ } else {
+ dataV2.skip(items);
+ }
+ if (nanos != null) {
+ nanos.skip(items);
+ } else {
+ nanosV2.skip(items);
+ }
}
}
private static class DecimalTreeReader extends TreeReader{
private InStream valueStream;
- private RunLengthIntegerReader scaleStream;
+ private RunLengthIntegerReader scaleStream = null;
+ private IntegerCompressionReader scaleStreamV2 = null;
DecimalTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -636,23 +754,37 @@ void startStripe(Map streams,
super.startStripe(streams, encodings);
valueStream = streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.DATA));
- scaleStream = new RunLengthIntegerReader(streams.get(
- new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
+ if (isDirectV2) {
+ scaleStreamV2 = new IntegerCompressionReader(streams.get(
+ new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
+ } else {
+ scaleStream = new RunLengthIntegerReader(streams.get(
+ new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
+ }
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
valueStream.seek(index[columnId]);
- scaleStream.seek(index[columnId]);
+ if (scaleStream != null) {
+ scaleStream.seek(index[columnId]);
+ } else {
+ scaleStreamV2.seek(index[columnId]);
+ }
}
@Override
Object next(Object previous) throws IOException {
super.next(previous);
if (valuePresent) {
- return new HiveDecimal(SerializationUtils.readBigInteger(valueStream),
- (int) scaleStream.next());
+ if (scaleStream != null) {
+ return new HiveDecimal(SerializationUtils.readBigInteger(valueStream),
+ (int) scaleStream.next());
+ } else {
+ return new HiveDecimal(SerializationUtils.readBigInteger(valueStream),
+ (int) scaleStreamV2.next());
+ }
}
return null;
}
@@ -663,7 +795,11 @@ void skipRows(long items) throws IOException {
for(int i=0; i < items; i++) {
SerializationUtils.readBigInteger(valueStream);
}
- scaleStream.skip(items);
+ if (scaleStream != null) {
+ scaleStream.skip(items);
+ } else {
+ scaleStreamV2.skip(items);
+ }
}
}
@@ -671,7 +807,8 @@ void skipRows(long items) throws IOException {
private DynamicByteArray dictionaryBuffer = null;
private int dictionarySize;
private int[] dictionaryOffsets;
- private RunLengthIntegerReader reader;
+ private RunLengthIntegerReader reader = null;
+ private IntegerCompressionReader readerV2 = null;
StringTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -707,27 +844,44 @@ void startStripe(Map streams,
name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
in = streams.get(name);
RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false);
+ IntegerCompressionReader lenReaderV2 = new IntegerCompressionReader(in, false);
int offset = 0;
if (dictionaryOffsets == null ||
dictionaryOffsets.length < dictionarySize + 1) {
dictionaryOffsets = new int[dictionarySize + 1];
}
- for(int i=0; i < dictionarySize; ++i) {
- dictionaryOffsets[i] = offset;
- offset += (int) lenReader.next();
+ // for performance reasons checking it outside of loop
+ if (isDirectV2) {
+ for (int i = 0; i < dictionarySize; ++i) {
+ dictionaryOffsets[i] = offset;
+ offset += (int) lenReaderV2.next();
+ }
+ } else {
+ for (int i = 0; i < dictionarySize; ++i) {
+ dictionaryOffsets[i] = offset;
+ offset += (int) lenReader.next();
+ }
}
dictionaryOffsets[dictionarySize] = offset;
in.close();
// set up the row reader
name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), false);
+ if (isDirectV2) {
+ readerV2 = new IntegerCompressionReader(streams.get(name), false);
+ } else {
+ reader = new RunLengthIntegerReader(streams.get(name), false);
+ }
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
- reader.seek(index[columnId]);
+ if (reader != null) {
+ reader.seek(index[columnId]);
+ } else {
+ readerV2.seek(index[columnId]);
+ }
}
@Override
@@ -735,7 +889,12 @@ Object next(Object previous) throws IOException {
super.next(previous);
Text result = null;
if (valuePresent) {
- int entry = (int) reader.next();
+ int entry = 0;
+ if (reader != null) {
+ entry = (int) reader.next();
+ } else {
+ entry = (int) readerV2.next();
+ }
if (previous == null) {
result = new Text();
} else {
@@ -764,7 +923,11 @@ Object next(Object previous) throws IOException {
@Override
void skipRows(long items) throws IOException {
- reader.skip(countNonNulls(items));
+ if (reader != null) {
+ reader.skip(countNonNulls(items));
+ } else {
+ readerV2.skip(countNonNulls(items));
+ }
}
}
@@ -919,7 +1082,8 @@ void skipRows(long items) throws IOException {
private static class ListTreeReader extends TreeReader {
private final TreeReader elementReader;
- private RunLengthIntegerReader lengths;
+ private RunLengthIntegerReader lengths = null;
+ private IntegerCompressionReader lengthsV2 = null;
ListTreeReader(Path path, int columnId,
List types,
@@ -933,7 +1097,11 @@ void skipRows(long items) throws IOException {
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
- lengths.seek(index[columnId]);
+ if (lengths != null) {
+ lengths.seek(index[columnId]);
+ } else {
+ lengthsV2.seek(index[columnId]);
+ }
elementReader.seek(index);
}
@@ -949,7 +1117,12 @@ Object next(Object previous) throws IOException {
result = (ArrayList) previous;
}
int prevLength = result.size();
- int length = (int) lengths.next();
+ int length = 0;
+ if (lengths != null) {
+ length = (int) lengths.next();
+ } else {
+ length = (int) lengthsV2.next();
+ }
// extend the list to the new length
for(int i=prevLength; i < length; ++i) {
result.add(null);
@@ -972,8 +1145,13 @@ void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
- lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.LENGTH)), false);
+ if (isDirectV2) {
+ lengthsV2 = new IntegerCompressionReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ } else {
+ lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ }
if (elementReader != null) {
elementReader.startStripe(streams, encodings);
}
@@ -983,8 +1161,14 @@ void startStripe(Map streams,
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long childSkip = 0;
- for(long i=0; i < items; ++i) {
- childSkip += lengths.next();
+ if (lengths != null) {
+ for (long i = 0; i < items; ++i) {
+ childSkip += lengths.next();
+ }
+ } else {
+ for (long i = 0; i < items; ++i) {
+ childSkip += lengthsV2.next();
+ }
}
elementReader.skipRows(childSkip);
}
@@ -993,7 +1177,8 @@ void skipRows(long items) throws IOException {
private static class MapTreeReader extends TreeReader {
private final TreeReader keyReader;
private final TreeReader valueReader;
- private RunLengthIntegerReader lengths;
+ private RunLengthIntegerReader lengths = null;
+ private IntegerCompressionReader lengthsV2 = null;
MapTreeReader(Path path,
int columnId,
@@ -1018,7 +1203,11 @@ void skipRows(long items) throws IOException {
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
- lengths.seek(index[columnId]);
+ if (lengths != null) {
+ lengths.seek(index[columnId]);
+ } else {
+ lengthsV2.seek(index[columnId]);
+ }
keyReader.seek(index);
valueReader.seek(index);
}
@@ -1036,7 +1225,12 @@ Object next(Object previous) throws IOException {
}
// for now just clear and create new objects
result.clear();
- int length = (int) lengths.next();
+ int length = 0;
+ if (lengths != null) {
+ length = (int) lengths.next();
+ } else {
+ length = (int) lengthsV2.next();
+ }
// read the new elements into the array
for(int i=0; i< length; i++) {
result.put(keyReader.next(null), valueReader.next(null));
@@ -1050,8 +1244,13 @@ void startStripe(Map streams,
List encodings
) throws IOException {
super.startStripe(streams, encodings);
- lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.LENGTH)), false);
+ if (isDirectV2) {
+ lengthsV2 = new IntegerCompressionReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ } else {
+ lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ }
if (keyReader != null) {
keyReader.startStripe(streams, encodings);
}
@@ -1064,8 +1263,14 @@ void startStripe(Map streams,
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long childSkip = 0;
- for(long i=0; i < items; ++i) {
- childSkip += lengths.next();
+ if (lengths != null) {
+ for (long i = 0; i < items; ++i) {
+ childSkip += lengths.next();
+ }
+ } else {
+ for (long i = 0; i < items; ++i) {
+ childSkip += lengthsV2.next();
+ }
}
keyReader.skipRows(childSkip);
valueReader.skipRows(childSkip);
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/Utils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/Utils.java
new file mode 100644
index 0000000..08d4f73
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/Utils.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.util.Arrays;
+import java.util.Random;
+
+final class Utils {
+
+ // unused
+ private Utils() {
+ }
+
+ /**
+ * Count the number of bits required to encode the given value
+ *
+ * @param value
+ * @return bits required to store value
+ */
+ static int findNumBits(long value) {
+ int count = 0;
+ while (value > 0) {
+ count++;
+ value = value >>> 1;
+ }
+ return count;
+ }
+
+ /**
+ * zigzag encode the given value
+ *
+ * @param val
+ * @return zigzag encoded value
+ */
+ static long zigzagEncode(long val) {
+ return (val << 1) ^ (val >> 63);
+ }
+
+ /**
+ * zigzag decode the given value
+ *
+ * @param val
+ * @return zizag decoded value
+ */
+ static long zigzagDecode(long val) {
+ return (val >>> 1) ^ -(val & 1);
+ }
+
+ /**
+ * Next random long value
+ *
+ * @param rng
+ * @param n
+ * @return random long value
+ */
+ static long nextLong(Random rng, long n) {
+ long bits, val;
+ do {
+ bits = (rng.nextLong() << 1) >>> 1;
+ val = bits % n;
+ } while (bits - val + (n - 1) < 0L);
+ return val;
+ }
+
+ /**
+ * Compute the specified percentile value for an array
+ *
+ * @param data
+ * - array
+ * @param p
+ * - percentile value (>=0.0 to <=1.0)
+ * @param sorted
+ * - if the array is sorted or not
+ * @return percentile value
+ */
+ static double percentile(long[] data, double p, boolean sorted) {
+ if ((p > 1.0) || (p <= 0.0)) {
+ throw new IllegalArgumentException("invalid percentile value: " + p);
+ }
+
+ long[] input = data;
+
+ if (sorted == false) {
+ input = Arrays.copyOf(data, data.length);
+ Arrays.sort(input);
+ }
+
+ int n = input.length;
+ int idx = (int) Math.floor((n + 1) * p);
+ if (idx >= n) {
+ return input[n - 1];
+ }
+
+ if (idx < 1) {
+ return input[0];
+ }
+ long lower = input[idx - 1];
+ long upper = input[idx];
+ double diff = ((n + 1) * p) - idx;
+ return lower + diff * (upper - lower);
+ }
+
+ /**
+ * Convert byte array to long
+ *
+ * @param b
+ * - byte array
+ * @return long value
+ */
+ static long bytesToLongBE(byte[] b) {
+ long out = 0;
+
+ int offset = b.length - 1;
+ for (int i = 0; i < b.length; i++) {
+ long val = 0xff & b[i];
+ out |= (val << (offset * 8));
+ offset -= 1;
+ }
+
+ return out;
+ }
+
+ /**
+ * Calculate the number of bytes required
+ *
+ * @param n
+ * - number of values
+ * @param numBits
+ * - bit width
+ * @return number of bytes required
+ */
+ static int getTotalBytesRequired(int n, int numBits) {
+ return (int) Math.ceil((double) (n * numBits) / 8.0);
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 52defb9..df99dc8 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -364,6 +364,7 @@ public boolean isCompressed() {
private final PositionedOutputStream rowIndexStream;
private boolean foundNulls;
private OutStream isPresentOutStream;
+ protected final boolean useDirectV2Encoding;
/**
* Create a tree writer.
@@ -379,6 +380,9 @@ public boolean isCompressed() {
this.isCompressed = streamFactory.isCompressed();
this.id = columnId;
this.inspector = inspector;
+ // make direct v2 encoding as default
+ // FIXME: do we need to make this user configurable?
+ this.useDirectV2Encoding = true;
if (nullable) {
isPresentOutStream = streamFactory.createStream(id,
OrcProto.Stream.Kind.PRESENT);
@@ -493,6 +497,10 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
* @return the information about the encoding of this column
*/
OrcProto.ColumnEncoding getEncoding() {
+ if (useDirectV2Encoding) {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
return OrcProto.ColumnEncoding.newBuilder().setKind(
OrcProto.ColumnEncoding.Kind.DIRECT).build();
}
@@ -619,6 +627,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
private static class IntegerTreeWriter extends TreeWriter {
private final RunLengthIntegerWriter writer;
+ private final IntegerCompressionWriter writerV2;
private final ShortObjectInspector shortInspector;
private final IntObjectInspector intInspector;
private final LongObjectInspector longInspector;
@@ -630,7 +639,13 @@ void recordPosition(PositionRecorder recorder) throws IOException {
super(columnId, inspector, writer, nullable);
PositionedOutputStream out = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
- this.writer = new RunLengthIntegerWriter(out, true);
+ if (super.useDirectV2Encoding) {
+ this.writer = null;
+ this.writerV2 = new IntegerCompressionWriter(out, true);
+ } else {
+ this.writer = new RunLengthIntegerWriter(out, true);
+ this.writerV2 = null;
+ }
if (inspector instanceof IntObjectInspector) {
intInspector = (IntObjectInspector) inspector;
shortInspector = null;
@@ -661,7 +676,11 @@ void write(Object obj) throws IOException {
val = shortInspector.get(obj);
}
indexStatistics.updateInteger(val);
- writer.write(val);
+ if (writer != null) {
+ writer.write(val);
+ } else {
+ writerV2.write(val);
+ }
}
}
@@ -669,14 +688,22 @@ void write(Object obj) throws IOException {
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
- writer.flush();
+ if (writer != null) {
+ writer.flush();
+ } else {
+ writerV2.flush();
+ }
recordPosition(rowIndexPosition);
}
@Override
void recordPosition(PositionRecorder recorder) throws IOException {
super.recordPosition(recorder);
- writer.getPosition(recorder);
+ if (writer != null) {
+ writer.getPosition(recorder);
+ } else {
+ writerV2.getPosition(recorder);
+ }
}
}
@@ -761,6 +788,8 @@ void recordPosition(PositionRecorder recorder) throws IOException {
private final PositionedOutputStream stringOutput;
private final RunLengthIntegerWriter lengthOutput;
private final RunLengthIntegerWriter rowOutput;
+ private final IntegerCompressionWriter lengthOutputV2;
+ private final IntegerCompressionWriter rowOutputV2;
private final StringRedBlackTree dictionary =
new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
private final DynamicIntArray rows = new DynamicIntArray();
@@ -776,10 +805,21 @@ void recordPosition(PositionRecorder recorder) throws IOException {
super(columnId, inspector, writer, nullable);
stringOutput = writer.createStream(id,
OrcProto.Stream.Kind.DICTIONARY_DATA);
- lengthOutput = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false);
- rowOutput = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), false);
+ if (super.useDirectV2Encoding == true) {
+ lengthOutput = null;
+ rowOutput = null;
+ lengthOutputV2 = new IntegerCompressionWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ rowOutputV2 = new IntegerCompressionWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), false);
+ } else {
+ lengthOutput = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ rowOutput = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), false);
+ lengthOutputV2 = null;
+ rowOutputV2 = null;
+ }
recordPosition(rowIndexPosition);
rowIndexValueCount.add(0L);
buildIndex = writer.buildIndex();
@@ -808,7 +848,11 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
public void visit(StringRedBlackTree.VisitorContext context
) throws IOException {
context.writeBytes(stringOutput);
- lengthOutput.write(context.getLength());
+ if (lengthOutput != null) {
+ lengthOutput.write(context.getLength());
+ } else {
+ lengthOutputV2.write(context.getLength());
+ }
dumpOrder[context.getOriginalPosition()] = currentId++;
}
});
@@ -824,20 +868,37 @@ public void visit(StringRedBlackTree.VisitorContext context
rowIndexEntry < savedRowIndex.size()) {
OrcProto.RowIndexEntry.Builder base =
savedRowIndex.get(rowIndexEntry++).toBuilder();
- rowOutput.getPosition(new RowIndexPositionRecorder(base));
+ if (rowOutput != null) {
+ rowOutput.getPosition(new RowIndexPositionRecorder(base));
+ } else {
+ rowOutputV2.getPosition(new RowIndexPositionRecorder(base));
+ }
rowIndex.addEntry(base.build());
}
}
if (i != length) {
- rowOutput.write(dumpOrder[rows.get(i)]);
+ if (rowOutput != null) {
+ rowOutput.write(dumpOrder[rows.get(i)]);
+ } else {
+ rowOutputV2.write(dumpOrder[rows.get(i)]);
+ }
}
}
// we need to build the rowindex before calling super, since it
// writes it out.
super.writeStripe(builder, requiredIndexEntries);
stringOutput.flush();
- lengthOutput.flush();
- rowOutput.flush();
+ if (lengthOutput != null) {
+ lengthOutput.flush();
+ } else {
+ lengthOutputV2.flush();
+ }
+
+ if (rowOutput != null) {
+ rowOutput.flush();
+ } else {
+ rowOutputV2.flush();
+ }
// reset all of the fields to be ready for the next stripe.
dictionary.clear();
rows.clear();
@@ -882,6 +943,7 @@ long estimateMemory() {
private static class BinaryTreeWriter extends TreeWriter {
private final PositionedOutputStream stream;
private final RunLengthIntegerWriter length;
+ private final IntegerCompressionWriter lengthV2;
BinaryTreeWriter(int columnId,
ObjectInspector inspector,
@@ -890,8 +952,15 @@ long estimateMemory() {
super(columnId, inspector, writer, nullable);
this.stream = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
- this.length = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false);
+ if (super.useDirectV2Encoding) {
+ this.lengthV2 = new IntegerCompressionWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ this.length = null;
+ } else {
+ this.length = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ this.lengthV2 = null;
+ }
recordPosition(rowIndexPosition);
}
@@ -902,7 +971,11 @@ void write(Object obj) throws IOException {
BytesWritable val =
((BinaryObjectInspector) inspector).getPrimitiveWritableObject(obj);
stream.write(val.getBytes(), 0, val.getLength());
- length.write(val.getLength());
+ if (length != null) {
+ length.write(val.getLength());
+ } else {
+ lengthV2.write(val.getLength());
+ }
}
}
@@ -911,7 +984,11 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
stream.flush();
- length.flush();
+ if (length != null) {
+ length.flush();
+ } else {
+ lengthV2.flush();
+ }
recordPosition(rowIndexPosition);
}
@@ -919,7 +996,11 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
void recordPosition(PositionRecorder recorder) throws IOException {
super.recordPosition(recorder);
stream.getPosition(recorder);
- length.getPosition(recorder);
+ if (length != null) {
+ length.getPosition(recorder);
+ } else {
+ lengthV2.getPosition(recorder);
+ }
}
}
@@ -930,16 +1011,29 @@ void recordPosition(PositionRecorder recorder) throws IOException {
private static class TimestampTreeWriter extends TreeWriter {
private final RunLengthIntegerWriter seconds;
private final RunLengthIntegerWriter nanos;
+ private final IntegerCompressionWriter secondsV2;
+ private final IntegerCompressionWriter nanosV2;
TimestampTreeWriter(int columnId,
ObjectInspector inspector,
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
- this.seconds = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), true);
- this.nanos = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), false);
+ if (super.useDirectV2Encoding) {
+ this.seconds = null;
+ this.nanos = null;
+ this.secondsV2 = new IntegerCompressionWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), true);
+ this.nanosV2 = new IntegerCompressionWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), false);
+ } else {
+ this.secondsV2 = null;
+ this.nanosV2 = null;
+ this.seconds = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), true);
+ this.nanos = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), false);
+ }
recordPosition(rowIndexPosition);
}
@@ -950,8 +1044,17 @@ void write(Object obj) throws IOException {
Timestamp val =
((TimestampObjectInspector) inspector).
getPrimitiveJavaObject(obj);
- seconds.write((val.getTime() / MILLIS_PER_SECOND) - BASE_TIMESTAMP);
- nanos.write(formatNanos(val.getNanos()));
+ if (seconds != null) {
+ seconds.write((val.getTime() / MILLIS_PER_SECOND) - BASE_TIMESTAMP);
+ } else {
+ secondsV2.write((val.getTime() / MILLIS_PER_SECOND) - BASE_TIMESTAMP);
+ }
+
+ if (nanos != null) {
+ nanos.write(formatNanos(val.getNanos()));
+ } else {
+ nanosV2.write(formatNanos(val.getNanos()));
+ }
}
}
@@ -959,8 +1062,17 @@ void write(Object obj) throws IOException {
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
- seconds.flush();
- nanos.flush();
+ if (seconds != null) {
+ seconds.flush();
+ } else {
+ secondsV2.flush();
+ }
+
+ if (nanos != null) {
+ nanos.flush();
+ } else {
+ nanosV2.flush();
+ }
recordPosition(rowIndexPosition);
}
@@ -983,14 +1095,23 @@ private static long formatNanos(int nanos) {
@Override
void recordPosition(PositionRecorder recorder) throws IOException {
super.recordPosition(recorder);
- seconds.getPosition(recorder);
- nanos.getPosition(recorder);
+ if (seconds != null) {
+ seconds.getPosition(recorder);
+ } else {
+ secondsV2.getPosition(recorder);
+ }
+ if (nanos != null) {
+ nanos.getPosition(recorder);
+ } else {
+ nanosV2.getPosition(recorder);
+ }
}
}
private static class DecimalTreeWriter extends TreeWriter {
private final PositionedOutputStream valueStream;
private final RunLengthIntegerWriter scaleStream;
+ private final IntegerCompressionWriter scaleStreamV2;
DecimalTreeWriter(int columnId,
ObjectInspector inspector,
@@ -998,8 +1119,15 @@ void recordPosition(PositionRecorder recorder) throws IOException {
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
- scaleStream = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), true);
+ if (super.useDirectV2Encoding) {
+ this.scaleStream = null;
+ this.scaleStreamV2 = new IntegerCompressionWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), true);
+ } else {
+ this.scaleStreamV2 = null;
+ scaleStream = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), true);
+ }
recordPosition(rowIndexPosition);
}
@@ -1011,7 +1139,11 @@ void write(Object obj) throws IOException {
getPrimitiveJavaObject(obj);
SerializationUtils.writeBigInteger(valueStream,
decimal.unscaledValue());
- scaleStream.write(decimal.scale());
+ if (scaleStream != null) {
+ scaleStream.write(decimal.scale());
+ } else {
+ scaleStreamV2.write(decimal.scale());
+ }
indexStatistics.updateDecimal(decimal);
}
}
@@ -1021,7 +1153,11 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
valueStream.flush();
- scaleStream.flush();
+ if (scaleStream != null) {
+ scaleStream.flush();
+ } else {
+ scaleStreamV2.flush();
+ }
recordPosition(rowIndexPosition);
}
@@ -1029,7 +1165,11 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
void recordPosition(PositionRecorder recorder) throws IOException {
super.recordPosition(recorder);
valueStream.getPosition(recorder);
- scaleStream.getPosition(recorder);
+ if (scaleStream != null) {
+ scaleStream.getPosition(recorder);
+ } else {
+ scaleStreamV2.getPosition(recorder);
+ }
}
}
@@ -1077,6 +1217,7 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
private static class ListTreeWriter extends TreeWriter {
private final RunLengthIntegerWriter lengths;
+ private final IntegerCompressionWriter lengthsV2;
ListTreeWriter(int columnId,
ObjectInspector inspector,
@@ -1088,9 +1229,15 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
childrenWriters[0] =
createTreeWriter(listObjectInspector.getListElementObjectInspector(),
writer, true);
- lengths =
- new RunLengthIntegerWriter(writer.createStream(columnId,
+ if (super.useDirectV2Encoding) {
+ lengths = null;
+ lengthsV2 = new IntegerCompressionWriter(writer.createStream(columnId,
OrcProto.Stream.Kind.LENGTH), false);
+ } else {
+ lengthsV2 = null;
+ lengths = new RunLengthIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false);
+ }
recordPosition(rowIndexPosition);
}
@@ -1100,7 +1247,11 @@ void write(Object obj) throws IOException {
if (obj != null) {
ListObjectInspector insp = (ListObjectInspector) inspector;
int len = insp.getListLength(obj);
- lengths.write(len);
+ if (lengths != null) {
+ lengths.write(len);
+ } else {
+ lengthsV2.write(len);
+ }
for(int i=0; i < len; ++i) {
childrenWriters[0].write(insp.getListElement(obj, i));
}
@@ -1111,7 +1262,11 @@ void write(Object obj) throws IOException {
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
- lengths.flush();
+ if (lengths != null) {
+ lengths.flush();
+ } else {
+ lengthsV2.flush();
+ }
for(TreeWriter child: childrenWriters) {
child.writeStripe(builder, requiredIndexEntries);
}
@@ -1121,12 +1276,17 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
@Override
void recordPosition(PositionRecorder recorder) throws IOException {
super.recordPosition(recorder);
- lengths.getPosition(recorder);
+ if (lengths != null) {
+ lengths.getPosition(recorder);
+ } else {
+ lengthsV2.getPosition(recorder);
+ }
}
}
private static class MapTreeWriter extends TreeWriter {
private final RunLengthIntegerWriter lengths;
+ private final IntegerCompressionWriter lengthsV2;
MapTreeWriter(int columnId,
ObjectInspector inspector,
@@ -1139,9 +1299,17 @@ void recordPosition(PositionRecorder recorder) throws IOException {
createTreeWriter(insp.getMapKeyObjectInspector(), writer, true);
childrenWriters[1] =
createTreeWriter(insp.getMapValueObjectInspector(), writer, true);
- lengths =
- new RunLengthIntegerWriter(writer.createStream(columnId,
- OrcProto.Stream.Kind.LENGTH), false);
+ if (super.useDirectV2Encoding) {
+ lengths = null;
+ lengthsV2 =
+ new IntegerCompressionWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false);
+ } else {
+ lengthsV2 = null;
+ lengths =
+ new RunLengthIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false);
+ }
recordPosition(rowIndexPosition);
}
@@ -1151,7 +1319,11 @@ void write(Object obj) throws IOException {
if (obj != null) {
MapObjectInspector insp = (MapObjectInspector) inspector;
int len = insp.getMapSize(obj);
- lengths.write(len);
+ if (lengths != null) {
+ lengths.write(len);
+ } else {
+ lengthsV2.write(len);
+ }
// this sucks, but it will have to do until we can get a better
// accessor in the MapObjectInspector.
Map, ?> valueMap = insp.getMap(obj);
@@ -1166,7 +1338,11 @@ void write(Object obj) throws IOException {
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
- lengths.flush();
+ if (lengths != null) {
+ lengths.flush();
+ } else {
+ lengthsV2.flush();
+ }
for(TreeWriter child: childrenWriters) {
child.writeStripe(builder, requiredIndexEntries);
}
@@ -1176,7 +1352,11 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
@Override
void recordPosition(PositionRecorder recorder) throws IOException {
super.recordPosition(recorder);
- lengths.getPosition(recorder);
+ if (lengths != null) {
+ lengths.getPosition(recorder);
+ } else {
+ lengthsV2.getPosition(recorder);
+ }
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java.orig ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java.orig
new file mode 100644
index 0000000..d1e468c
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java.orig
@@ -0,0 +1,1544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+
+/**
+ * An ORC file writer. The file is divided into stripes, which is the natural
+ * unit of work when reading. Each stripe is buffered in memory until the
+ * memory reaches the stripe size and then it is written out broken down by
+ * columns. Each column is written by a TreeWriter that is specific to that
+ * type of column. TreeWriters may have children TreeWriters that handle the
+ * sub-types. Each of the TreeWriters writes the column's data as a set of
+ * streams.
+ *
+ * This class is synchronized so that multi-threaded access is ok. In
+ * particular, because the MemoryManager is shared between writers, this class
+ * assumes that checkMemory may be called from a separate thread.
+ */
+class WriterImpl implements Writer, MemoryManager.Callback {
+
+ private static final Log LOG = LogFactory.getLog(WriterImpl.class);
+
+ private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+ private static final int MIN_ROW_INDEX_STRIDE = 1000;
+
+ private final FileSystem fs;
+ private final Path path;
+ private final long stripeSize;
+ private final int rowIndexStride;
+ private final CompressionKind compress;
+ private final CompressionCodec codec;
+ private final int bufferSize;
+ // the streams that make up the current stripe
+ private final Map streams =
+ new TreeMap();
+
+ private FSDataOutputStream rawWriter = null;
+ // the compressed metadata information outStream
+ private OutStream writer = null;
+ // a protobuf outStream around streamFactory
+ private CodedOutputStream protobufWriter = null;
+ private long headerLength;
+ private int columnCount;
+ private long rowCount = 0;
+ private long rowsInStripe = 0;
+ private int rowsInIndex = 0;
+ private final List stripes =
+ new ArrayList();
+ private final Map userMetadata =
+ new TreeMap();
+ private final StreamFactory streamFactory = new StreamFactory();
+ private final TreeWriter treeWriter;
+ private final OrcProto.RowIndex.Builder rowIndex =
+ OrcProto.RowIndex.newBuilder();
+ private final boolean buildIndex;
+ private final MemoryManager memoryManager;
+
+ WriterImpl(FileSystem fs,
+ Path path,
+ ObjectInspector inspector,
+ long stripeSize,
+ CompressionKind compress,
+ int bufferSize,
+ int rowIndexStride,
+ MemoryManager memoryManager) throws IOException {
+ this.fs = fs;
+ this.path = path;
+ this.stripeSize = stripeSize;
+ this.compress = compress;
+ this.bufferSize = bufferSize;
+ this.rowIndexStride = rowIndexStride;
+ this.memoryManager = memoryManager;
+ buildIndex = rowIndexStride > 0;
+ codec = createCodec(compress);
+ treeWriter = createTreeWriter(inspector, streamFactory, false);
+ if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
+ throw new IllegalArgumentException("Row stride must be at least " +
+ MIN_ROW_INDEX_STRIDE);
+ }
+ // ensure that we are able to handle callbacks before we register ourselves
+ memoryManager.addWriter(path, stripeSize, this);
+ }
+
+ static CompressionCodec createCodec(CompressionKind kind) {
+ switch (kind) {
+ case NONE:
+ return null;
+ case ZLIB:
+ return new ZlibCodec();
+ case SNAPPY:
+ return new SnappyCodec();
+ case LZO:
+ try {
+ Class extends CompressionCodec> lzo =
+ (Class extends CompressionCodec>)
+ Class.forName("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
+ return lzo.newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("LZO is not available.", e);
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Problem initializing LZO", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Insufficient access to LZO", e);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown compression codec: " +
+ kind);
+ }
+ }
+
+ @Override
+ public synchronized boolean checkMemory(double newScale) throws IOException {
+ long limit = (long) Math.round(stripeSize * newScale);
+ long size = estimateStripeSize();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
+ limit);
+ }
+ if (size > limit) {
+ flushStripe();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * This class is used to hold the contents of streams as they are buffered.
+ * The TreeWriters write to the outStream and the codec compresses the
+ * data as buffers fill up and stores them in the output list. When the
+ * stripe is being written, the whole stream is written to the file.
+ */
+ private class BufferedStream implements OutStream.OutputReceiver {
+ private final OutStream outStream;
+ private final List output = new ArrayList();
+
+ BufferedStream(String name, int bufferSize,
+ CompressionCodec codec) throws IOException {
+ outStream = new OutStream(name, bufferSize, codec, this);
+ }
+
+ /**
+ * Receive a buffer from the compression codec.
+ * @param buffer the buffer to save
+ * @throws IOException
+ */
+ @Override
+ public void output(ByteBuffer buffer) {
+ output.add(buffer);
+ }
+
+ /**
+ * Get the number of bytes in buffers that are allocated to this stream.
+ * @return number of bytes in buffers
+ */
+ public long getBufferSize() {
+ long result = 0;
+ for(ByteBuffer buf: output) {
+ result += buf.capacity();
+ }
+ return outStream.getBufferSize() + result;
+ }
+
+ /**
+ * Flush the stream to the codec.
+ * @throws IOException
+ */
+ public void flush() throws IOException {
+ outStream.flush();
+ }
+
+ /**
+ * Clear all of the buffers.
+ * @throws IOException
+ */
+ public void clear() throws IOException {
+ outStream.clear();
+ output.clear();
+ }
+
+ /**
+ * Check the state of suppress flag in output stream
+ * @return value of suppress flag
+ */
+ public boolean isSuppressed() {
+ return outStream.isSuppressed();
+ }
+
+ /**
+ * Write the saved compressed buffers to the OutputStream.
+ * @param out the stream to write to
+ * @throws IOException
+ */
+ void spillTo(OutputStream out) throws IOException {
+ for(ByteBuffer buffer: output) {
+ out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return outStream.toString();
+ }
+ }
+
+ /**
+ * An output receiver that writes the ByteBuffers to the output stream
+ * as they are received.
+ */
+ private class DirectStream implements OutStream.OutputReceiver {
+ private final FSDataOutputStream output;
+
+ DirectStream(FSDataOutputStream output) {
+ this.output = output;
+ }
+
+ @Override
+ public void output(ByteBuffer buffer) throws IOException {
+ output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ }
+
+ private static class RowIndexPositionRecorder implements PositionRecorder {
+ private final OrcProto.RowIndexEntry.Builder builder;
+
+ RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public void addPosition(long position) {
+ builder.addPositions(position);
+ }
+ }
+
+ /**
+ * Interface from the Writer to the TreeWriters. This limits the visibility
+ * that the TreeWriters have into the Writer.
+ */
+ private class StreamFactory {
+ /**
+ * Create a stream to store part of a column.
+ * @param column the column id for the stream
+ * @param kind the kind of stream
+ * @return The output outStream that the section needs to be written to.
+ * @throws IOException
+ */
+ public OutStream createStream(int column,
+ OrcProto.Stream.Kind kind
+ ) throws IOException {
+ StreamName name = new StreamName(column, kind);
+ BufferedStream result = streams.get(name);
+ if (result == null) {
+ result = new BufferedStream(name.toString(), bufferSize, codec);
+ streams.put(name, result);
+ }
+ return result.outStream;
+ }
+
+ /**
+ * Get the next column id.
+ * @return a number from 0 to the number of columns - 1
+ */
+ public int getNextColumnId() {
+ return columnCount++;
+ }
+
+ /**
+ * Get the stride rate of the row index.
+ */
+ public int getRowIndexStride() {
+ return rowIndexStride;
+ }
+
+ /**
+ * Should be building the row index.
+ * @return true if we are building the index
+ */
+ public boolean buildIndex() {
+ return buildIndex;
+ }
+
+ /**
+ * Is the ORC file compressed?
+ * @return are the streams compressed
+ */
+ public boolean isCompressed() {
+ return codec != null;
+ }
+ }
+
+ /**
+ * The parent class of all of the writers for each column. Each column
+ * is written by an instance of this class. The compound types (struct,
+ * list, map, and union) have children tree writers that write the children
+ * types.
+ */
+ private abstract static class TreeWriter {
+ protected final int id;
+ protected final ObjectInspector inspector;
+ private final BitFieldWriter isPresent;
+ private final boolean isCompressed;
+ protected final ColumnStatisticsImpl indexStatistics;
+ private final ColumnStatisticsImpl fileStatistics;
+ protected TreeWriter[] childrenWriters;
+ protected final RowIndexPositionRecorder rowIndexPosition;
+ private final OrcProto.RowIndex.Builder rowIndex;
+ private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
+ private final PositionedOutputStream rowIndexStream;
+ private boolean foundNulls;
+ private OutStream isPresentOutStream;
+
+ /**
+ * Create a tree writer.
+ * @param columnId the column id of the column to write
+ * @param inspector the object inspector to use
+ * @param streamFactory limited access to the Writer's data.
+ * @param nullable can the value be null?
+ * @throws IOException
+ */
+ TreeWriter(int columnId, ObjectInspector inspector,
+ StreamFactory streamFactory,
+ boolean nullable) throws IOException {
+ this.isCompressed = streamFactory.isCompressed();
+ this.id = columnId;
+ this.inspector = inspector;
+ if (nullable) {
+ isPresentOutStream = streamFactory.createStream(id,
+ OrcProto.Stream.Kind.PRESENT);
+ isPresent = new BitFieldWriter(isPresentOutStream, 1);
+ } else {
+ isPresent = null;
+ }
+ this.foundNulls = false;
+ indexStatistics = ColumnStatisticsImpl.create(inspector);
+ fileStatistics = ColumnStatisticsImpl.create(inspector);
+ childrenWriters = new TreeWriter[0];
+ rowIndex = OrcProto.RowIndex.newBuilder();
+ rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
+ rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
+ if (streamFactory.buildIndex()) {
+ rowIndexStream = streamFactory.createStream(id,
+ OrcProto.Stream.Kind.ROW_INDEX);
+ } else {
+ rowIndexStream = null;
+ }
+ }
+
+ protected OrcProto.RowIndex.Builder getRowIndex() {
+ return rowIndex;
+ }
+
+ protected ColumnStatisticsImpl getFileStatistics() {
+ return fileStatistics;
+ }
+
+ protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
+ return rowIndexEntry;
+ }
+
+ /**
+ * Add a new value to the column.
+ * @param obj
+ * @throws IOException
+ */
+ void write(Object obj) throws IOException {
+ if (obj != null) {
+ indexStatistics.increment();
+ }
+ if (isPresent != null) {
+ isPresent.write(obj == null ? 0 : 1);
+ if(obj == null) {
+ foundNulls = true;
+ }
+ }
+ }
+
+ private void removeIsPresentPositions() {
+ for(int i=0; i < rowIndex.getEntryCount(); ++i) {
+ RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
+ List positions = entry.getPositionsList();
+ // bit streams use 3 positions if uncompressed, 4 if compressed
+ positions = positions.subList(isCompressed ? 4 : 3, positions.size());
+ entry.clearPositions();
+ entry.addAllPositions(positions);
+ }
+ }
+
+ /**
+ * Write the stripe out to the file.
+ * @param builder the stripe footer that contains the information about the
+ * layout of the stripe. The TreeWriter is required to update
+ * the footer with its information.
+ * @param requiredIndexEntries the number of index entries that are
+ * required. this is to check to make sure the
+ * row index is well formed.
+ * @throws IOException
+ */
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ if (isPresent != null) {
+ isPresent.flush();
+
+ // if no nulls are found in a stream, then suppress the stream
+ if(!foundNulls) {
+ isPresentOutStream.suppress();
+ // since isPresent bitstream is suppressed, update the index to
+ // remove the positions of the isPresent stream
+ if (rowIndexStream != null) {
+ removeIsPresentPositions();
+ }
+ }
+ }
+
+ // reset the flag for next stripe
+ foundNulls = false;
+
+ builder.addColumns(getEncoding());
+ if (rowIndexStream != null) {
+ if (rowIndex.getEntryCount() != requiredIndexEntries) {
+ throw new IllegalArgumentException("Column has wrong number of " +
+ "index entries found: " + rowIndexEntry + " expected: " +
+ requiredIndexEntries);
+ }
+ rowIndex.build().writeTo(rowIndexStream);
+ rowIndexStream.flush();
+ }
+ rowIndex.clear();
+ rowIndexEntry.clear();
+ }
+
+ TreeWriter[] getChildrenWriters() {
+ return childrenWriters;
+ }
+
+ /**
+ * Get the encoding for this column.
+ * @return the information about the encoding of this column
+ */
+ OrcProto.ColumnEncoding getEncoding() {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ /**
+ * Create a row index entry with the previous location and the current
+ * index statistics. Also merges the index statistics into the file
+ * statistics before they are cleared. Finally, it records the start of the
+ * next index and ensures all of the children columns also create an entry.
+ * @throws IOException
+ */
+ void createRowIndexEntry() throws IOException {
+ fileStatistics.merge(indexStatistics);
+ rowIndexEntry.setStatistics(indexStatistics.serialize());
+ indexStatistics.reset();
+ rowIndex.addEntry(rowIndexEntry);
+ rowIndexEntry.clear();
+ recordPosition(rowIndexPosition);
+ for(TreeWriter child: childrenWriters) {
+ child.createRowIndexEntry();
+ }
+ }
+
+ /**
+ * Record the current position in each of this column's streams.
+ * @param recorder where should the locations be recorded
+ * @throws IOException
+ */
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ if (isPresent != null) {
+ isPresent.getPosition(recorder);
+ }
+ }
+
+ /**
+ * Estimate how much memory the writer is consuming excluding the streams.
+ * @return the number of bytes.
+ */
+ long estimateMemory() {
+ long result = 0;
+ for (TreeWriter child: childrenWriters) {
+ result += child.estimateMemory();
+ }
+ return result;
+ }
+ }
+
+ private static class BooleanTreeWriter extends TreeWriter {
+ private final BitFieldWriter writer;
+
+ BooleanTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ PositionedOutputStream out = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.writer = new BitFieldWriter(out, 1);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ boolean val = ((BooleanObjectInspector) inspector).get(obj);
+ indexStatistics.updateBoolean(val);
+ writer.write(val ? 1 : 0);
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ writer.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+ }
+
+ private static class ByteTreeWriter extends TreeWriter {
+ private final RunLengthByteWriter writer;
+
+ ByteTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ this.writer = new RunLengthByteWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA));
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ byte val = ((ByteObjectInspector) inspector).get(obj);
+ indexStatistics.updateInteger(val);
+ writer.write(val);
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ writer.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+ }
+
+ private static class IntegerTreeWriter extends TreeWriter {
+ private final RunLengthIntegerWriter writer;
+ private final ShortObjectInspector shortInspector;
+ private final IntObjectInspector intInspector;
+ private final LongObjectInspector longInspector;
+
+ IntegerTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ PositionedOutputStream out = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.writer = new RunLengthIntegerWriter(out, true);
+ if (inspector instanceof IntObjectInspector) {
+ intInspector = (IntObjectInspector) inspector;
+ shortInspector = null;
+ longInspector = null;
+ } else {
+ intInspector = null;
+ if (inspector instanceof LongObjectInspector) {
+ longInspector = (LongObjectInspector) inspector;
+ shortInspector = null;
+ } else {
+ shortInspector = (ShortObjectInspector) inspector;
+ longInspector = null;
+ }
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ long val;
+ if (intInspector != null) {
+ val = intInspector.get(obj);
+ } else if (longInspector != null) {
+ val = longInspector.get(obj);
+ } else {
+ val = shortInspector.get(obj);
+ }
+ indexStatistics.updateInteger(val);
+ writer.write(val);
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ writer.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+ }
+
+ private static class FloatTreeWriter extends TreeWriter {
+ private final PositionedOutputStream stream;
+
+ FloatTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ this.stream = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ float val = ((FloatObjectInspector) inspector).get(obj);
+ indexStatistics.updateDouble(val);
+ SerializationUtils.writeFloat(stream, val);
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ stream.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ stream.getPosition(recorder);
+ }
+ }
+
+ private static class DoubleTreeWriter extends TreeWriter {
+ private final PositionedOutputStream stream;
+
+ DoubleTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ this.stream = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ double val = ((DoubleObjectInspector) inspector).get(obj);
+ indexStatistics.updateDouble(val);
+ SerializationUtils.writeDouble(stream, val);
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ stream.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ stream.getPosition(recorder);
+ }
+ }
+
+ private static class StringTreeWriter extends TreeWriter {
+ private static final int INITIAL_DICTIONARY_SIZE = 4096;
+ private final PositionedOutputStream stringOutput;
+ private final RunLengthIntegerWriter lengthOutput;
+ private final RunLengthIntegerWriter rowOutput;
+ private final StringRedBlackTree dictionary =
+ new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
+ private final DynamicIntArray rows = new DynamicIntArray();
+ private final List savedRowIndex =
+ new ArrayList();
+ private final boolean buildIndex;
+ private final List rowIndexValueCount = new ArrayList();
+
+ StringTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ stringOutput = writer.createStream(id,
+ OrcProto.Stream.Kind.DICTIONARY_DATA);
+ lengthOutput = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ rowOutput = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), false);
+ recordPosition(rowIndexPosition);
+ rowIndexValueCount.add(0L);
+ buildIndex = writer.buildIndex();
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ String val = ((StringObjectInspector) inspector)
+ .getPrimitiveJavaObject(obj);
+ rows.add(dictionary.add(val));
+ indexStatistics.updateString(val);
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ // Traverse the red-black tree writing out the bytes and lengths; and
+ // creating the map from the original order to the final sorted order.
+ final int[] dumpOrder = new int[dictionary.size()];
+ dictionary.visit(new StringRedBlackTree.Visitor() {
+ private int currentId = 0;
+ @Override
+ public void visit(StringRedBlackTree.VisitorContext context
+ ) throws IOException {
+ context.writeBytes(stringOutput);
+ lengthOutput.write(context.getLength());
+ dumpOrder[context.getOriginalPosition()] = currentId++;
+ }
+ });
+ int length = rows.size();
+ int rowIndexEntry = 0;
+ OrcProto.RowIndex.Builder rowIndex = getRowIndex();
+ // write the values translated into the dump order.
+ for(int i = 0; i <= length; ++i) {
+ // now that we are writing out the row values, we can finalize the
+ // row index
+ if (buildIndex) {
+ while (i == rowIndexValueCount.get(rowIndexEntry) &&
+ rowIndexEntry < savedRowIndex.size()) {
+ OrcProto.RowIndexEntry.Builder base =
+ savedRowIndex.get(rowIndexEntry++).toBuilder();
+ rowOutput.getPosition(new RowIndexPositionRecorder(base));
+ rowIndex.addEntry(base.build());
+ }
+ }
+ if (i != length) {
+ rowOutput.write(dumpOrder[rows.get(i)]);
+ }
+ }
+ // we need to build the rowindex before calling super, since it
+ // writes it out.
+ super.writeStripe(builder, requiredIndexEntries);
+ stringOutput.flush();
+ lengthOutput.flush();
+ rowOutput.flush();
+ // reset all of the fields to be ready for the next stripe.
+ dictionary.clear();
+ rows.clear();
+ savedRowIndex.clear();
+ rowIndexValueCount.clear();
+ recordPosition(rowIndexPosition);
+ rowIndexValueCount.add(0L);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DICTIONARY).
+ setDictionarySize(dictionary.size()).build();
+ }
+
+ /**
+ * This method doesn't call the super method, because unlike most of the
+ * other TreeWriters, this one can't record the position in the streams
+ * until the stripe is being flushed. Therefore it saves all of the entries
+ * and augments them with the final information as the stripe is written.
+ * @throws IOException
+ */
+ @Override
+ void createRowIndexEntry() throws IOException {
+ getFileStatistics().merge(indexStatistics);
+ OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
+ rowIndexEntry.setStatistics(indexStatistics.serialize());
+ indexStatistics.reset();
+ savedRowIndex.add(rowIndexEntry.build());
+ rowIndexEntry.clear();
+ recordPosition(rowIndexPosition);
+ rowIndexValueCount.add(Long.valueOf(rows.size()));
+ }
+
+ @Override
+ long estimateMemory() {
+ return rows.getSizeInBytes() + dictionary.getSizeInBytes();
+ }
+ }
+
+ private static class BinaryTreeWriter extends TreeWriter {
+ private final PositionedOutputStream stream;
+ private final RunLengthIntegerWriter length;
+
+ BinaryTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ this.stream = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.length = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ BytesWritable val =
+ ((BinaryObjectInspector) inspector).getPrimitiveWritableObject(obj);
+ stream.write(val.getBytes(), 0, val.getLength());
+ length.write(val.getLength());
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ stream.flush();
+ length.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ stream.getPosition(recorder);
+ length.getPosition(recorder);
+ }
+ }
+
+ static final int MILLIS_PER_SECOND = 1000;
+ static final long BASE_TIMESTAMP =
+ Timestamp.valueOf("2015-01-01 00:00:00").getTime() / MILLIS_PER_SECOND;
+
+ private static class TimestampTreeWriter extends TreeWriter {
+ private final RunLengthIntegerWriter seconds;
+ private final RunLengthIntegerWriter nanos;
+
+ TimestampTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ this.seconds = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), true);
+ this.nanos = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), false);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ Timestamp val =
+ ((TimestampObjectInspector) inspector).
+ getPrimitiveJavaObject(obj);
+ seconds.write((val.getTime() / MILLIS_PER_SECOND) - BASE_TIMESTAMP);
+ nanos.write(formatNanos(val.getNanos()));
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ seconds.flush();
+ nanos.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ private static long formatNanos(int nanos) {
+ if (nanos == 0) {
+ return 0;
+ } else if (nanos % 100 != 0) {
+ return ((long) nanos) << 3;
+ } else {
+ nanos /= 100;
+ int trailingZeros = 1;
+ while (nanos % 10 == 0 && trailingZeros < 7) {
+ nanos /= 10;
+ trailingZeros += 1;
+ }
+ return ((long) nanos) << 3 | trailingZeros;
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ seconds.getPosition(recorder);
+ nanos.getPosition(recorder);
+ }
+ }
+
+ private static class DecimalTreeWriter extends TreeWriter {
+ private final PositionedOutputStream valueStream;
+ private final RunLengthIntegerWriter scaleStream;
+
+ DecimalTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+ scaleStream = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), true);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ HiveDecimal decimal = ((HiveDecimalObjectInspector) inspector).
+ getPrimitiveJavaObject(obj);
+ SerializationUtils.writeBigInteger(valueStream,
+ decimal.unscaledValue());
+ scaleStream.write(decimal.scale());
+ indexStatistics.updateDecimal(decimal);
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ valueStream.flush();
+ scaleStream.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ valueStream.getPosition(recorder);
+ scaleStream.getPosition(recorder);
+ }
+ }
+
+ private static class StructTreeWriter extends TreeWriter {
+ private final List extends StructField> fields;
+ StructTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ StructObjectInspector structObjectInspector =
+ (StructObjectInspector) inspector;
+ fields = structObjectInspector.getAllStructFieldRefs();
+ childrenWriters = new TreeWriter[fields.size()];
+ for(int i=0; i < childrenWriters.length; ++i) {
+ childrenWriters[i] = createTreeWriter(
+ fields.get(i).getFieldObjectInspector(), writer, true);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ StructObjectInspector insp = (StructObjectInspector) inspector;
+ for(int i = 0; i < fields.size(); ++i) {
+ StructField field = fields.get(i);
+ TreeWriter writer = childrenWriters[i];
+ writer.write(insp.getStructFieldData(obj, field));
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ private static class ListTreeWriter extends TreeWriter {
+ private final RunLengthIntegerWriter lengths;
+
+ ListTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ ListObjectInspector listObjectInspector = (ListObjectInspector) inspector;
+ childrenWriters = new TreeWriter[1];
+ childrenWriters[0] =
+ createTreeWriter(listObjectInspector.getListElementObjectInspector(),
+ writer, true);
+ lengths =
+ new RunLengthIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ ListObjectInspector insp = (ListObjectInspector) inspector;
+ int len = insp.getListLength(obj);
+ lengths.write(len);
+ for(int i=0; i < len; ++i) {
+ childrenWriters[0].write(insp.getListElement(obj, i));
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ lengths.flush();
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ lengths.getPosition(recorder);
+ }
+ }
+
+ private static class MapTreeWriter extends TreeWriter {
+ private final RunLengthIntegerWriter lengths;
+
+ MapTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ MapObjectInspector insp = (MapObjectInspector) inspector;
+ childrenWriters = new TreeWriter[2];
+ childrenWriters[0] =
+ createTreeWriter(insp.getMapKeyObjectInspector(), writer, true);
+ childrenWriters[1] =
+ createTreeWriter(insp.getMapValueObjectInspector(), writer, true);
+ lengths =
+ new RunLengthIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ MapObjectInspector insp = (MapObjectInspector) inspector;
+ int len = insp.getMapSize(obj);
+ lengths.write(len);
+ // this sucks, but it will have to do until we can get a better
+ // accessor in the MapObjectInspector.
+ Map, ?> valueMap = insp.getMap(obj);
+ for(Map.Entry, ?> entry: valueMap.entrySet()) {
+ childrenWriters[0].write(entry.getKey());
+ childrenWriters[1].write(entry.getValue());
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ lengths.flush();
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ lengths.getPosition(recorder);
+ }
+ }
+
+ private static class UnionTreeWriter extends TreeWriter {
+ private final RunLengthByteWriter tags;
+
+ UnionTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ UnionObjectInspector insp = (UnionObjectInspector) inspector;
+ List choices = insp.getObjectInspectors();
+ childrenWriters = new TreeWriter[choices.size()];
+ for(int i=0; i < childrenWriters.length; ++i) {
+ childrenWriters[i] = createTreeWriter(choices.get(i), writer, true);
+ }
+ tags =
+ new RunLengthByteWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.DATA));
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ UnionObjectInspector insp = (UnionObjectInspector) inspector;
+ byte tag = insp.getTag(obj);
+ tags.write(tag);
+ childrenWriters[tag].write(insp.getField(obj));
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ tags.flush();
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ tags.getPosition(recorder);
+ }
+ }
+
+ private static TreeWriter createTreeWriter(ObjectInspector inspector,
+ StreamFactory streamFactory,
+ boolean nullable
+ ) throws IOException {
+ switch (inspector.getCategory()) {
+ case PRIMITIVE:
+ switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) {
+ case BOOLEAN:
+ return new BooleanTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ case BYTE:
+ return new ByteTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ case SHORT:
+ case INT:
+ case LONG:
+ return new IntegerTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ case FLOAT:
+ return new FloatTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ case DOUBLE:
+ return new DoubleTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ case STRING:
+ return new StringTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ case BINARY:
+ return new BinaryTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ case TIMESTAMP:
+ return new TimestampTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ case DECIMAL:
+ return new DecimalTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ default:
+ throw new IllegalArgumentException("Bad primitive category " +
+ ((PrimitiveObjectInspector) inspector).getPrimitiveCategory());
+ }
+ case STRUCT:
+ return new StructTreeWriter(streamFactory.getNextColumnId(), inspector,
+ streamFactory, nullable);
+ case MAP:
+ return new MapTreeWriter(streamFactory.getNextColumnId(), inspector,
+ streamFactory, nullable);
+ case LIST:
+ return new ListTreeWriter(streamFactory.getNextColumnId(), inspector,
+ streamFactory, nullable);
+ case UNION:
+ return new UnionTreeWriter(streamFactory.getNextColumnId(), inspector,
+ streamFactory, nullable);
+ default:
+ throw new IllegalArgumentException("Bad category: " +
+ inspector.getCategory());
+ }
+ }
+
+ private static void writeTypes(OrcProto.Footer.Builder builder,
+ TreeWriter treeWriter) {
+ OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+ switch (treeWriter.inspector.getCategory()) {
+ case PRIMITIVE:
+ switch (((PrimitiveObjectInspector) treeWriter.inspector).
+ getPrimitiveCategory()) {
+ case BOOLEAN:
+ type.setKind(OrcProto.Type.Kind.BOOLEAN);
+ break;
+ case BYTE:
+ type.setKind(OrcProto.Type.Kind.BYTE);
+ break;
+ case SHORT:
+ type.setKind(OrcProto.Type.Kind.SHORT);
+ break;
+ case INT:
+ type.setKind(OrcProto.Type.Kind.INT);
+ break;
+ case LONG:
+ type.setKind(OrcProto.Type.Kind.LONG);
+ break;
+ case FLOAT:
+ type.setKind(OrcProto.Type.Kind.FLOAT);
+ break;
+ case DOUBLE:
+ type.setKind(OrcProto.Type.Kind.DOUBLE);
+ break;
+ case STRING:
+ type.setKind(OrcProto.Type.Kind.STRING);
+ break;
+ case BINARY:
+ type.setKind(OrcProto.Type.Kind.BINARY);
+ break;
+ case TIMESTAMP:
+ type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+ break;
+ case DECIMAL:
+ type.setKind(OrcProto.Type.Kind.DECIMAL);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown primitive category: " +
+ ((PrimitiveObjectInspector) treeWriter.inspector).
+ getPrimitiveCategory());
+ }
+ break;
+ case LIST:
+ type.setKind(OrcProto.Type.Kind.LIST);
+ type.addSubtypes(treeWriter.childrenWriters[0].id);
+ break;
+ case MAP:
+ type.setKind(OrcProto.Type.Kind.MAP);
+ type.addSubtypes(treeWriter.childrenWriters[0].id);
+ type.addSubtypes(treeWriter.childrenWriters[1].id);
+ break;
+ case STRUCT:
+ type.setKind(OrcProto.Type.Kind.STRUCT);
+ for(TreeWriter child: treeWriter.childrenWriters) {
+ type.addSubtypes(child.id);
+ }
+ for(StructField field: ((StructTreeWriter) treeWriter).fields) {
+ type.addFieldNames(field.getFieldName());
+ }
+ break;
+ case UNION:
+ type.setKind(OrcProto.Type.Kind.UNION);
+ for(TreeWriter child: treeWriter.childrenWriters) {
+ type.addSubtypes(child.id);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown category: " +
+ treeWriter.inspector.getCategory());
+ }
+ builder.addTypes(type);
+ for(TreeWriter child: treeWriter.childrenWriters) {
+ writeTypes(builder, child);
+ }
+ }
+
+ private void ensureWriter() throws IOException {
+ if (rawWriter == null) {
+ rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
+ fs.getDefaultReplication(),
+ Math.min(stripeSize * 2L, Integer.MAX_VALUE));
+ rawWriter.writeBytes(OrcFile.MAGIC);
+ headerLength = rawWriter.getPos();
+ writer = new OutStream("metadata", bufferSize, codec,
+ new DirectStream(rawWriter));
+ protobufWriter = CodedOutputStream.newInstance(writer);
+ }
+ }
+
+ private void createRowIndexEntry() throws IOException {
+ treeWriter.createRowIndexEntry();
+ rowsInIndex = 0;
+ }
+
+ private void flushStripe() throws IOException {
+ ensureWriter();
+ if (buildIndex && rowsInIndex != 0) {
+ createRowIndexEntry();
+ }
+ if (rowsInStripe != 0) {
+ int requiredIndexEntries = rowIndexStride == 0 ? 0 :
+ (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
+ OrcProto.StripeFooter.Builder builder =
+ OrcProto.StripeFooter.newBuilder();
+ treeWriter.writeStripe(builder, requiredIndexEntries);
+ long start = rawWriter.getPos();
+ long section = start;
+ long indexEnd = start;
+ for(Map.Entry pair: streams.entrySet()) {
+ BufferedStream stream = pair.getValue();
+ if (!stream.isSuppressed()) {
+ stream.flush();
+ stream.spillTo(rawWriter);
+ long end = rawWriter.getPos();
+ StreamName name = pair.getKey();
+ builder.addStreams(OrcProto.Stream.newBuilder()
+ .setColumn(name.getColumn())
+ .setKind(name.getKind())
+ .setLength(end-section));
+ section = end;
+ if (StreamName.Area.INDEX == name.getArea()) {
+ indexEnd = end;
+ }
+ }
+ stream.clear();
+ }
+ builder.build().writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ long end = rawWriter.getPos();
+ OrcProto.StripeInformation dirEntry =
+ OrcProto.StripeInformation.newBuilder()
+ .setOffset(start)
+ .setIndexLength(indexEnd - start)
+ .setDataLength(section - indexEnd)
+ .setNumberOfRows(rowsInStripe)
+ .setFooterLength(end - section).build();
+ stripes.add(dirEntry);
+ rowCount += rowsInStripe;
+ rowsInStripe = 0;
+ }
+ }
+
+ private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
+ switch (kind) {
+ case NONE: return OrcProto.CompressionKind.NONE;
+ case ZLIB: return OrcProto.CompressionKind.ZLIB;
+ case SNAPPY: return OrcProto.CompressionKind.SNAPPY;
+ case LZO: return OrcProto.CompressionKind.LZO;
+ default:
+ throw new IllegalArgumentException("Unknown compression " + kind);
+ }
+ }
+
+ private void writeFileStatistics(OrcProto.Footer.Builder builder,
+ TreeWriter writer) throws IOException {
+ builder.addStatistics(writer.fileStatistics.serialize());
+ for(TreeWriter child: writer.getChildrenWriters()) {
+ writeFileStatistics(builder, child);
+ }
+ }
+
+ private int writeFooter(long bodyLength) throws IOException {
+ ensureWriter();
+ OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder();
+ builder.setContentLength(bodyLength);
+ builder.setHeaderLength(headerLength);
+ builder.setNumberOfRows(rowCount);
+ builder.setRowIndexStride(rowIndexStride);
+ // serialize the types
+ writeTypes(builder, treeWriter);
+ // add the stripe information
+ for(OrcProto.StripeInformation stripe: stripes) {
+ builder.addStripes(stripe);
+ }
+ // add the column statistics
+ writeFileStatistics(builder, treeWriter);
+ // add all of the user metadata
+ for(Map.Entry entry: userMetadata.entrySet()) {
+ builder.addMetadata(OrcProto.UserMetadataItem.newBuilder()
+ .setName(entry.getKey()).setValue(entry.getValue()));
+ }
+ long startPosn = rawWriter.getPos();
+ builder.build().writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ return (int) (rawWriter.getPos() - startPosn);
+ }
+
+ private int writePostScript(int footerLength) throws IOException {
+ OrcProto.PostScript.Builder builder =
+ OrcProto.PostScript.newBuilder()
+ .setCompression(writeCompressionKind(compress))
+ .setFooterLength(footerLength);
+ if (compress != CompressionKind.NONE) {
+ builder.setCompressionBlockSize(bufferSize);
+ }
+ OrcProto.PostScript ps = builder.build();
+ // need to write this uncompressed
+ long startPosn = rawWriter.getPos();
+ ps.writeTo(rawWriter);
+ long length = rawWriter.getPos() - startPosn;
+ if (length > 255) {
+ throw new IllegalArgumentException("PostScript too large at " + length);
+ }
+ return (int) length;
+ }
+
+ private long estimateStripeSize() {
+ long result = 0;
+ for(BufferedStream stream: streams.values()) {
+ result += stream.getBufferSize();
+ }
+ result += treeWriter.estimateMemory();
+ return result;
+ }
+
+ @Override
+ public synchronized void addUserMetadata(String name, ByteBuffer value) {
+ userMetadata.put(name, ByteString.copyFrom(value));
+ }
+
+ @Override
+ public void addRow(Object row) throws IOException {
+ synchronized (this) {
+ treeWriter.write(row);
+ rowsInStripe += 1;
+ if (buildIndex) {
+ rowsInIndex += 1;
+
+ if (rowsInIndex >= rowIndexStride) {
+ createRowIndexEntry();
+ }
+ }
+ }
+ memoryManager.addedRow();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // remove us from the memory manager so that we don't get any callbacks
+ memoryManager.removeWriter(path);
+ // actually close the file
+ synchronized (this) {
+ flushStripe();
+ int footerLength = writeFooter(rawWriter.getPos());
+ rawWriter.writeByte(writePostScript(footerLength));
+ rawWriter.close();
+ }
+ }
+}
diff --git ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
index d19bc06..994be70 100644
--- ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
+++ ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
@@ -66,6 +66,7 @@ message ColumnEncoding {
enum Kind {
DIRECT = 0;
DICTIONARY = 1;
+ DIRECT_V2 = 2;
}
required Kind kind = 1;
optional uint32 dictionarySize = 2;
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
new file mode 100644
index 0000000..99f7d9a
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.Random;
+
+import org.junit.Test;
+
+import com.google.common.primitives.Longs;
+
+public class TestBitPack {
+
+ private static final int SIZE = 100;
+ private static Random rand = new Random(100);
+
+ private long[] deltaEncode(long[] inp) {
+ long[] output = new long[inp.length];
+ for (int i = 0; i < inp.length; i++) {
+ output[i] = Utils.zigzagEncode(inp[i]);
+ }
+ return output;
+ }
+
+ private void runTest(int numBits) {
+ long[] inp = new long[SIZE];
+ for (int i = 0; i < SIZE; i++) {
+ long val = 0;
+ if (numBits <= 32) {
+ val = rand.nextInt((int) Math.pow(2, numBits - 1));
+ } else {
+ val = Utils.nextLong(rand, (long) Math.pow(2, numBits - 2));
+ }
+ if (val % 2 == 0) {
+ val = -val;
+ }
+ inp[i] = val;
+ }
+ long[] deltaEncoded = deltaEncode(inp);
+ long minInput = Collections.min(Longs.asList(deltaEncoded));
+ long maxInput = Collections.max(Longs.asList(deltaEncoded));
+ long rangeInput = maxInput - minInput;
+ int fixedWidth = Utils.findNumBits(rangeInput);
+ fixedWidth = FixedBitSizes.getClosestFixedBits(fixedWidth);
+ BitPackWriter bpw = new BitPackWriter(fixedWidth);
+ byte[] packed = bpw.pack(deltaEncoded, deltaEncoded.length);
+
+ BitPackReader bpr = new BitPackReader(fixedWidth);
+ long[] deltaDec = bpr.unpack(packed, deltaEncoded.length);
+ long[] out = new long[deltaDec.length];
+ for (int i = 0; i < deltaDec.length; i++) {
+ out[i] = Utils.zigzagDecode(deltaDec[i]);
+ }
+ assertEquals(numBits, fixedWidth);
+ assertArrayEquals(inp, out);
+ }
+
+ @Test
+ public void test01BitPacking1Bit() {
+ long[] inp = new long[SIZE];
+ for (int i = 0; i < SIZE; i++) {
+ inp[i] = -1 * rand.nextInt(2);
+ }
+ long[] deltaEncoded = deltaEncode(inp);
+ long minInput = Collections.min(Longs.asList(deltaEncoded));
+ long maxInput = Collections.max(Longs.asList(deltaEncoded));
+ long rangeInput = maxInput - minInput;
+ int fixedWidth = Utils.findNumBits(rangeInput);
+ fixedWidth = FixedBitSizes.getClosestFixedBits(fixedWidth);
+ BitPackWriter bpw = new BitPackWriter(fixedWidth);
+ byte[] packed = bpw.pack(deltaEncoded, deltaEncoded.length);
+ BitPackReader bpr = new BitPackReader(fixedWidth);
+ long[] deltaDec = bpr.unpack(packed, SIZE);
+ long[] out = new long[deltaDec.length];
+ for (int i = 0; i < deltaDec.length; i++) {
+ out[i] = Utils.zigzagDecode(deltaDec[i]);
+ }
+ assertEquals(1, fixedWidth);
+ assertArrayEquals(inp, out);
+ }
+
+ @Test
+ public void test02BitPacking2Bit() {
+ runTest(2);
+ }
+
+ @Test
+ public void test03BitPacking3Bit() {
+ runTest(3);
+ }
+
+ @Test
+ public void test04BitPacking4Bit() {
+ runTest(4);
+ }
+
+ @Test
+ public void test05BitPacking5Bit() {
+ runTest(5);
+ }
+
+ @Test
+ public void test06BitPacking6Bit() {
+ runTest(6);
+ }
+
+ @Test
+ public void test07BitPacking7Bit() {
+ runTest(7);
+ }
+
+ @Test
+ public void test08BitPacking8Bit() {
+ runTest(8);
+ }
+
+ @Test
+ public void test09BitPacking9Bit() {
+ runTest(9);
+ }
+
+ @Test
+ public void test10BitPacking10Bit() {
+ runTest(10);
+ }
+
+ @Test
+ public void test11BitPacking11Bit() {
+ runTest(11);
+ }
+
+ @Test
+ public void test12BitPacking12Bit() {
+ runTest(12);
+ }
+
+ @Test
+ public void test13BitPacking13Bit() {
+ runTest(13);
+ }
+
+ @Test
+ public void test14BitPacking14Bit() {
+ runTest(14);
+ }
+
+ @Test
+ public void test15BitPacking15Bit() {
+ runTest(15);
+ }
+
+ @Test
+ public void test16BitPacking16Bit() {
+ runTest(16);
+ }
+
+ @Test
+ public void test17BitPacking17Bit() {
+ runTest(17);
+ }
+
+ @Test
+ public void test18BitPacking18Bit() {
+ runTest(18);
+ }
+
+ @Test
+ public void test19BitPacking19Bit() {
+ runTest(19);
+ }
+
+ @Test
+ public void test20BitPacking20Bit() {
+ runTest(20);
+ }
+
+ @Test
+ public void test21BitPacking21Bit() {
+ runTest(21);
+ }
+
+ @Test
+ public void test22BitPacking22Bit() {
+ runTest(22);
+ }
+
+ @Test
+ public void test23BitPacking23Bit() {
+ runTest(23);
+ }
+
+ @Test
+ public void test24BitPacking24Bit() {
+ runTest(24);
+ }
+
+ @Test
+ public void test26BitPacking26Bit() {
+ runTest(26);
+ }
+
+ @Test
+ public void test28BitPacking28Bit() {
+ runTest(28);
+ }
+
+ @Test
+ public void test30BitPacking30Bit() {
+ runTest(30);
+ }
+
+ @Test
+ public void test32BitPacking32Bit() {
+ runTest(32);
+ }
+
+ @Test
+ public void test40BitPacking40Bit() {
+ runTest(40);
+ }
+
+ @Test
+ public void test48BitPacking48Bit() {
+ runTest(48);
+ }
+
+ @Test
+ public void test56BitPacking56Bit() {
+ runTest(56);
+ }
+
+ @Test
+ public void test64BitPacking64Bit() {
+ runTest(64);
+ }
+}
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java
new file mode 100644
index 0000000..762e3d0
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Test;
+
+public class TestIntegerCompressionReader {
+
+ public void runSeekTest(CompressionCodec codec) throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ IntegerCompressionWriter out = new IntegerCompressionWriter(
+ new OutStream("test", 1000, codec, collect), true);
+ TestInStream.PositionCollector[] positions =
+ new TestInStream.PositionCollector[4096];
+ Random random = new Random(99);
+ int[] junk = new int[2048];
+ for(int i=0; i < junk.length; ++i) {
+ junk[i] = random.nextInt();
+ }
+ for(int i=0; i < 4096; ++i) {
+ positions[i] = new TestInStream.PositionCollector();
+ out.getPosition(positions[i]);
+ // test runs, incrementing runs, non-runs
+ if (i < 1024) {
+ out.write(i/4);
+ } else if (i < 2048) {
+ out.write(2*i);
+ } else {
+ out.write(junk[i-2048]);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ IntegerCompressionReader in = new IntegerCompressionReader(InStream.create
+ ("test", inBuf, codec, 1000), true);
+ for(int i=0; i < 2048; ++i) {
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i/4, x);
+ } else if (i < 2048) {
+ assertEquals(2*i, x);
+ } else {
+ assertEquals(junk[i-2048], x);
+ }
+ }
+ for(int i=2047; i >= 0; --i) {
+ in.seek(positions[i]);
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i/4, x);
+ } else if (i < 2048) {
+ assertEquals(2*i, x);
+ } else {
+ assertEquals(junk[i-2048], x);
+ }
+ }
+ }
+
+ @Test
+ public void testUncompressedSeek() throws Exception {
+ runSeekTest(null);
+ }
+
+ @Test
+ public void testCompressedSeek() throws Exception {
+ runSeekTest(new ZlibCodec());
+ }
+
+ @Test
+ public void testSkips() throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ IntegerCompressionWriter out = new IntegerCompressionWriter(
+ new OutStream("test", 100, null, collect), true);
+ for(int i=0; i < 2048; ++i) {
+ if (i < 1024) {
+ out.write(i);
+ } else {
+ out.write(256 * i);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ IntegerCompressionReader in = new IntegerCompressionReader(InStream.create
+ ("test", inBuf, null, 100), true);
+ for(int i=0; i < 2048; i += 10) {
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i, x);
+ } else {
+ assertEquals(256 * i, x);
+ }
+ if (i < 2038) {
+ in.skip(9);
+ }
+ in.skip(0);
+ }
+ }
+}
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
new file mode 100644
index 0000000..9cc4be6
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
@@ -0,0 +1,542 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+
+public class TestNewIntegerEncoding {
+
+ public static class Row {
+ Integer int1;
+ Long long1;
+
+ public Row(int val, long l) {
+ this.int1 = val;
+ this.long1 = l;
+ }
+ }
+
+ public List fetchData(String path) throws IOException {
+ List input = new ArrayList();
+ FileInputStream stream = new FileInputStream(new File(path));
+ try {
+ FileChannel fc = stream.getChannel();
+ MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size());
+ /* Instead of using default, pass in a decoder. */
+ String[] lines = Charset.defaultCharset().decode(bb).toString()
+ .split("\n");
+ for (String line : lines) {
+ long val = 0;
+ try {
+ val = Long.parseLong(line);
+ } catch (NumberFormatException e) {
+ // for now lets ignore (assign 0)
+ }
+ input.add(val);
+ }
+ } finally {
+ stream.close();
+ }
+ return input;
+ }
+
+ Path workDir = new Path(System.getProperty("test.tmp.dir", "target"
+ + File.separator + "test" + File.separator + "tmp"));
+
+ Configuration conf;
+ FileSystem fs;
+ Path testFilePath;
+ String resDir = "src/test/resources";
+
+ @Rule
+ public TestName testCaseName = new TestName();
+
+ @Before
+ public void openFileSystem() throws Exception {
+ conf = new Configuration();
+ fs = FileSystem.getLocal(conf);
+ testFilePath = new Path(workDir, "TestOrcFile."
+ + testCaseName.getMethodName() + ".orc");
+ fs.delete(testFilePath, false);
+ }
+
+ @Test
+ public void testBasicRow() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Row.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ writer.addRow(new Row(111, 1111L));
+ writer.addRow(new Row(111, 1111L));
+ writer.addRow(new Row(111, 1111L));
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(new IntWritable(111), ((OrcStruct) row).getFieldValue(0));
+ assertEquals(new LongWritable(1111), ((OrcStruct) row).getFieldValue(1));
+ }
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ long[] inp = new long[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5, 6,
+ 7, 8, 9, 10, 1, 1, 1, 1, 1, 1, 10, 9, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1,
+ 2, 5, 1, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1,
+ 9, 2, 6, 3, 7, 1, 9, 2, 6, 2000, 2, 1, 1, 1, 1, 1, 3, 7, 1, 9, 2, 6, 1,
+ 1, 1, 1, 1 };
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testIntegerMin() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add((long) Integer.MIN_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.ZLIB, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testIntegerMax() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add((long) Integer.MAX_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testLongMin() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add(Long.MIN_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testLongMax() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add(Long.MAX_VALUE);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testRandomInt() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 100000; i++) {
+ input.add((long) rand.nextInt());
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testRandomLong() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 100000; i++) {
+ input.add(rand.nextLong());
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt0() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(0, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt1() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(1, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt255() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(255, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.ZLIB, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseAt256() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(256, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.ZLIB, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBase510() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(510, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.ZLIB, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBase511() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(100));
+ }
+ input.set(511, 20000L);
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.ZLIB, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testSeek() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(
+ Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 100000; i++) {
+ input.add((long) rand.nextInt());
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000,
+ CompressionKind.NONE, 10000, 10000);
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 55555;
+ rows.seekToRow(idx);
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+}
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java
index 9f989fd..2f7a7f1 100644
--- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.hive.ql.io.orc;
import static junit.framework.Assert.assertEquals;
diff --git ql/src/test/resources/orc-file-dump.out ql/src/test/resources/orc-file-dump.out
index 8b88931..50e8e91 100644
--- ql/src/test/resources/orc-file-dump.out
+++ ql/src/test/resources/orc-file-dump.out
@@ -11,73 +11,73 @@ Statistics:
Column 3: count: 21000 min: Darkness, max: worst
Stripes:
- Stripe: offset: 3 data: 69605 rows: 5000 tail: 72 index: 119
+ Stripe: offset: 3 data: 53140 rows: 4163 tail: 72 index: 119
Stream: column 0 section ROW_INDEX start: 3 length 10
Stream: column 1 section ROW_INDEX start: 13 length 35
Stream: column 2 section ROW_INDEX start: 48 length 39
Stream: column 3 section ROW_INDEX start: 87 length 35
- Stream: column 1 section DATA start: 122 length 22605
- Stream: column 2 section DATA start: 22727 length 43426
- Stream: column 3 section DATA start: 66153 length 3403
- Stream: column 3 section LENGTH start: 69556 length 38
- Stream: column 3 section DICTIONARY_DATA start: 69594 length 133
- Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
+ Stream: column 1 section DATA start: 122 length 16676
+ Stream: column 2 section DATA start: 16798 length 33334
+ Stream: column 3 section DATA start: 50132 length 2972
+ Stream: column 3 section LENGTH start: 53104 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 53129 length 133
+ Encoding column 0: DIRECT_V2
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
Encoding column 3: DICTIONARY[35]
- Stripe: offset: 69799 data: 69584 rows: 5000 tail: 73 index: 118
- Stream: column 0 section ROW_INDEX start: 69799 length 10
- Stream: column 1 section ROW_INDEX start: 69809 length 34
- Stream: column 2 section ROW_INDEX start: 69843 length 39
- Stream: column 3 section ROW_INDEX start: 69882 length 35
- Stream: column 1 section DATA start: 69917 length 22597
- Stream: column 2 section DATA start: 92514 length 43439
- Stream: column 3 section DATA start: 135953 length 3377
- Stream: column 3 section LENGTH start: 139330 length 38
- Stream: column 3 section DICTIONARY_DATA start: 139368 length 133
- Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
+ Stripe: offset: 53334 data: 63747 rows: 5000 tail: 73 index: 120
+ Stream: column 0 section ROW_INDEX start: 53334 length 10
+ Stream: column 1 section ROW_INDEX start: 53344 length 36
+ Stream: column 2 section ROW_INDEX start: 53380 length 39
+ Stream: column 3 section ROW_INDEX start: 53419 length 35
+ Stream: column 1 section DATA start: 53454 length 20029
+ Stream: column 2 section DATA start: 73483 length 40035
+ Stream: column 3 section DATA start: 113518 length 3525
+ Stream: column 3 section LENGTH start: 117043 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 117068 length 133
+ Encoding column 0: DIRECT_V2
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
Encoding column 3: DICTIONARY[35]
- Stripe: offset: 139574 data: 69570 rows: 5000 tail: 73 index: 120
- Stream: column 0 section ROW_INDEX start: 139574 length 10
- Stream: column 1 section ROW_INDEX start: 139584 length 36
- Stream: column 2 section ROW_INDEX start: 139620 length 39
- Stream: column 3 section ROW_INDEX start: 139659 length 35
- Stream: column 1 section DATA start: 139694 length 22594
- Stream: column 2 section DATA start: 162288 length 43415
- Stream: column 3 section DATA start: 205703 length 3390
- Stream: column 3 section LENGTH start: 209093 length 38
- Stream: column 3 section DICTIONARY_DATA start: 209131 length 133
- Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
+ Stripe: offset: 117274 data: 63767 rows: 5000 tail: 73 index: 120
+ Stream: column 0 section ROW_INDEX start: 117274 length 10
+ Stream: column 1 section ROW_INDEX start: 117284 length 36
+ Stream: column 2 section ROW_INDEX start: 117320 length 39
+ Stream: column 3 section ROW_INDEX start: 117359 length 35
+ Stream: column 1 section DATA start: 117394 length 20029
+ Stream: column 2 section DATA start: 137423 length 40035
+ Stream: column 3 section DATA start: 177458 length 3545
+ Stream: column 3 section LENGTH start: 181003 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 181028 length 133
+ Encoding column 0: DIRECT_V2
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
Encoding column 3: DICTIONARY[35]
- Stripe: offset: 209337 data: 69551 rows: 5000 tail: 72 index: 119
- Stream: column 0 section ROW_INDEX start: 209337 length 10
- Stream: column 1 section ROW_INDEX start: 209347 length 35
- Stream: column 2 section ROW_INDEX start: 209382 length 39
- Stream: column 3 section ROW_INDEX start: 209421 length 35
- Stream: column 1 section DATA start: 209456 length 22575
- Stream: column 2 section DATA start: 232031 length 43426
- Stream: column 3 section DATA start: 275457 length 3379
- Stream: column 3 section LENGTH start: 278836 length 38
- Stream: column 3 section DICTIONARY_DATA start: 278874 length 133
- Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
- Encoding column 3: DICTIONARY[35]
- Stripe: offset: 279079 data: 14096 rows: 1000 tail: 68 index: 120
- Stream: column 0 section ROW_INDEX start: 279079 length 10
- Stream: column 1 section ROW_INDEX start: 279089 length 36
- Stream: column 2 section ROW_INDEX start: 279125 length 39
- Stream: column 3 section ROW_INDEX start: 279164 length 35
- Stream: column 1 section DATA start: 279199 length 4529
- Stream: column 2 section DATA start: 283728 length 8690
- Stream: column 3 section DATA start: 292418 length 706
- Stream: column 3 section LENGTH start: 293124 length 38
- Stream: column 3 section DICTIONARY_DATA start: 293162 length 133
- Encoding column 0: DIRECT
- Encoding column 1: DIRECT
- Encoding column 2: DIRECT
+ Stripe: offset: 181234 data: 63786 rows: 5000 tail: 73 index: 120
+ Stream: column 0 section ROW_INDEX start: 181234 length 10
+ Stream: column 1 section ROW_INDEX start: 181244 length 36
+ Stream: column 2 section ROW_INDEX start: 181280 length 39
+ Stream: column 3 section ROW_INDEX start: 181319 length 35
+ Stream: column 1 section DATA start: 181354 length 20029
+ Stream: column 2 section DATA start: 201383 length 40035
+ Stream: column 3 section DATA start: 241418 length 3564
+ Stream: column 3 section LENGTH start: 244982 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 245007 length 133
+ Encoding column 0: DIRECT_V2
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
Encoding column 3: DICTIONARY[35]
+ Stripe: offset: 245213 data: 23583 rows: 1837 tail: 68 index: 120
+ Stream: column 0 section ROW_INDEX start: 245213 length 10
+ Stream: column 1 section ROW_INDEX start: 245223 length 36
+ Stream: column 2 section ROW_INDEX start: 245259 length 39
+ Stream: column 3 section ROW_INDEX start: 245298 length 35
+ Stream: column 1 section DATA start: 245333 length 7359
+ Stream: column 2 section DATA start: 252692 length 14710
+ Stream: column 3 section DATA start: 267402 length 1356
+ Stream: column 3 section LENGTH start: 268758 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 268783 length 133
+ Encoding column 0: DIRECT_V2
+ Encoding column 1: DIRECT_V2
+ Encoding column 2: DIRECT_V2
+ Encoding column 3: DICTIONARY[35]
\ No newline at end of file