diff --git ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java index f6acc00..cce1415 100644 --- ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java +++ ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java @@ -5157,10 +5157,12 @@ public ColumnEncoding getDefaultInstanceForType() { implements com.google.protobuf.ProtocolMessageEnum { DIRECT(0, 0), DICTIONARY(1, 1), + DIRECT_V2(2, 2), ; public static final int DIRECT_VALUE = 0; public static final int DICTIONARY_VALUE = 1; + public static final int DIRECT_V2_VALUE = 2; public final int getNumber() { return value; } @@ -5169,6 +5171,7 @@ public static Kind valueOf(int value) { switch (value) { case 0: return DIRECT; case 1: return DICTIONARY; + case 2: return DIRECT_V2; default: return null; } } @@ -5199,7 +5202,7 @@ public Kind findValueByNumber(int number) { } private static final Kind[] VALUES = { - DIRECT, DICTIONARY, + DIRECT, DICTIONARY, DIRECT_V2, }; public static Kind valueOf( @@ -10568,41 +10571,42 @@ void setMagic(com.google.protobuf.ByteString value) { "nd\022\016\n\006column\030\002 \001(\r\022\016\n\006length\030\003 \001(\004\"r\n\004Ki" + "nd\022\013\n\007PRESENT\020\000\022\010\n\004DATA\020\001\022\n\n\006LENGTH\020\002\022\023\n" + "\017DICTIONARY_DATA\020\003\022\024\n\020DICTIONARY_COUNT\020\004" + - "\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\221\001\n\016Colum", + "\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\240\001\n\016Colum", "nEncoding\022C\n\004kind\030\001 \002(\01625.org.apache.had" + "oop.hive.ql.io.orc.ColumnEncoding.Kind\022\026" + - "\n\016dictionarySize\030\002 \001(\r\"\"\n\004Kind\022\n\n\006DIRECT" + - "\020\000\022\016\n\nDICTIONARY\020\001\"\214\001\n\014StripeFooter\0229\n\007s" + - "treams\030\001 \003(\0132(.org.apache.hadoop.hive.ql" + - ".io.orc.Stream\022A\n\007columns\030\002 \003(\01320.org.ap" + - "ache.hadoop.hive.ql.io.orc.ColumnEncodin" + - "g\"\236\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apache.ha" + - "doop.hive.ql.io.orc.Type.Kind\022\024\n\010subtype" + - "s\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\260\001\n\004Kind", - "\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002\022\007\n\003IN" + - "T\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE\020\006\022\n\n\006" + - "STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020\t\022\010\n\004L" + - "IST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNION\020\r\022\013\n" + - "\007DECIMAL\020\016\"x\n\021StripeInformation\022\016\n\006offse" + - "t\030\001 \001(\004\022\023\n\013indexLength\030\002 \001(\004\022\022\n\ndataLeng" + - "th\030\003 \001(\004\022\024\n\014footerLength\030\004 \001(\004\022\024\n\014number" + - "OfRows\030\005 \001(\004\"/\n\020UserMetadataItem\022\014\n\004name" + - "\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n\006Footer\022\024\n\014head" + - "erLength\030\001 \001(\004\022\025\n\rcontentLength\030\002 \001(\004\022D\n", - "\007stripes\030\003 \003(\01323.org.apache.hadoop.hive." + - "ql.io.orc.StripeInformation\0225\n\005types\030\004 \003" + - "(\0132&.org.apache.hadoop.hive.ql.io.orc.Ty" + - "pe\022D\n\010metadata\030\005 \003(\01322.org.apache.hadoop" + - ".hive.ql.io.orc.UserMetadataItem\022\024\n\014numb" + - "erOfRows\030\006 \001(\004\022F\n\nstatistics\030\007 \003(\01322.org" + - ".apache.hadoop.hive.ql.io.orc.ColumnStat" + - "istics\022\026\n\016rowIndexStride\030\010 \001(\r\"\255\001\n\nPostS" + - "cript\022\024\n\014footerLength\030\001 \001(\004\022F\n\013compressi" + - "on\030\002 \001(\01621.org.apache.hadoop.hive.ql.io.", - "orc.CompressionKind\022\034\n\024compressionBlockS" + - "ize\030\003 \001(\004\022\023\n\007version\030\004 \003(\rB\002\020\001\022\016\n\005magic\030" + - "\300> \001(\t*:\n\017CompressionKind\022\010\n\004NONE\020\000\022\010\n\004Z" + - "LIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003" + "\n\016dictionarySize\030\002 \001(\r\"1\n\004Kind\022\n\n\006DIRECT" + + "\020\000\022\016\n\nDICTIONARY\020\001\022\r\n\tDIRECT_V2\020\002\"\214\001\n\014St" + + "ripeFooter\0229\n\007streams\030\001 \003(\0132(.org.apache" + + ".hadoop.hive.ql.io.orc.Stream\022A\n\007columns" + + "\030\002 \003(\01320.org.apache.hadoop.hive.ql.io.or" + + "c.ColumnEncoding\"\236\002\n\004Type\0229\n\004kind\030\001 \002(\0162" + + "+.org.apache.hadoop.hive.ql.io.orc.Type." + + "Kind\022\024\n\010subtypes\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames", + "\030\003 \003(\t\"\260\001\n\004Kind\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t" + + "\n\005SHORT\020\002\022\007\n\003INT\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022" + + "\n\n\006DOUBLE\020\006\022\n\n\006STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tT" + + "IMESTAMP\020\t\022\010\n\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020" + + "\014\022\t\n\005UNION\020\r\022\013\n\007DECIMAL\020\016\"x\n\021StripeInfor" + + "mation\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002 " + + "\001(\004\022\022\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength\030" + + "\004 \001(\004\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMetad" + + "ataItem\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n" + + "\006Footer\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rcontent", + "Length\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apac" + + "he.hadoop.hive.ql.io.orc.StripeInformati" + + "on\0225\n\005types\030\004 \003(\0132&.org.apache.hadoop.hi" + + "ve.ql.io.orc.Type\022D\n\010metadata\030\005 \003(\01322.or" + + "g.apache.hadoop.hive.ql.io.orc.UserMetad" + + "ataItem\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatist" + + "ics\030\007 \003(\01322.org.apache.hadoop.hive.ql.io" + + ".orc.ColumnStatistics\022\026\n\016rowIndexStride\030" + + "\010 \001(\r\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 \001" + + "(\004\022F\n\013compression\030\002 \001(\01621.org.apache.had", + "oop.hive.ql.io.orc.CompressionKind\022\034\n\024co" + + "mpressionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003(" + + "\rB\002\020\001\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKind" + + "\022\010\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020" + + "\003" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitPackReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitPackReader.java new file mode 100644 index 0000000..93baf6d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitPackReader.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +/** + * Reader which unpacks the bit packed values. + */ +class BitPackReader { + // current bit position + private int current; + + // bits left in a byte + private int bitsLeft; + private int numRead; + + // bit packed array + private byte[] packed; + private int numBits; + + BitPackReader(int numBits) { + this.packed = null; + this.current = 0; + this.bitsLeft = 0; + this.numBits = numBits; + this.numRead = 0; + } + + /** + * Reads next byte (as integer) from bit packed array + */ + private void readByte() { + current = 0xff & packed[numRead++]; + bitsLeft = 8; + } + + /** + * Unpack the bit packed input array till input array's length. + * + * @param inp + * - bit packed array + * @return unpacked values. null is returned if input array is null or empty. + */ + long[] unpack(byte[] inp) { + if (inp == null || inp.length == 0) { + return null; + } + + return unpack(inp, inp.length); + } + + /** + * Unpack the bit packed input array till the specified length. + * + * @param inp + * - bit packed array + * @param n + * - number of elements in the input array to unpack + * @return unpacked values. null for incorrect arguments + */ + long[] unpack(byte[] inp, int n) { + if (inp == null || inp.length < 1 || n < 1 || numBits < 1) { + return null; + } + + this.packed = inp; + + // output unpacked array + long[] unpacked = new long[n]; + if (inp.length > 0) { + readByte(); + } + + for (int i = 0; i < n; i++) { + long result = 0; + int bitsLeftToRead = numBits; + while (bitsLeftToRead > bitsLeft) { + result <<= bitsLeft; + result |= current & ((1 << bitsLeft) - 1); + bitsLeftToRead -= bitsLeft; + readByte(); + } + + // handle the left over bits + if (bitsLeftToRead > 0) { + result <<= bitsLeftToRead; + bitsLeft -= bitsLeftToRead; + result |= (current >> bitsLeft) & ((1 << bitsLeftToRead) - 1); + } + + unpacked[i] = result; + } + return unpacked; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitPackWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitPackWriter.java new file mode 100644 index 0000000..24bc828 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitPackWriter.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +/** + * Writer which bit packs integer values. + */ + +class BitPackWriter { + private byte[] packed; + private int numPacked; + private byte current; + private int bitsLeft; + private int numBits; + + BitPackWriter(int numBits) { + this.packed = null; + this.numPacked = 0; + this.current = 0; + this.bitsLeft = 8; + this.numBits = numBits; + } + + /** + * If there are any left over bits then flush the final byte + */ + private void flush() { + if (bitsLeft != 8) { + writeByte(); + } + } + + private void writeByte() { + packed[numPacked++] = current; + current = 0; + bitsLeft = 8; + } + + /** + * Bit packs the input array for array length of values + * + * @param inp + * - input array + * @param n + * - number of elements in the array to bit pack + * @return bit packed byte array, null returned for any illegal argument + */ + byte[] pack(long[] inp) { + if (inp == null || inp.length == 0 || numBits < 1) { + return null; + } + + return pack(inp, inp.length); + } + + /** + * Bit packs the input array for specified length + * + * @param inp + * - input array + * @param n + * - number of elements in the array to bit pack + * @return bit packed byte array, null returned for any illegal argument + */ + byte[] pack(long[] inp, int n) { + if (inp == null || n < 1 || inp.length == 0 || numBits < 1) { + return null; + } + + int totalBytes = Utils.getTotalBytesRequired(n, numBits); + packed = new byte[totalBytes]; + + for (int i = 0; i < n; i++) { + long value = inp[i]; + int bitsToWrite = numBits; + while (bitsToWrite > bitsLeft) { + // add the bits to the bottom of the current word + current |= value >>> (bitsToWrite - bitsLeft); + // subtract out the bits we just added + bitsToWrite -= bitsLeft; + // zero out the bits above bitsToWrite + value &= (1L << bitsToWrite) - 1; + writeByte(); + } + bitsLeft -= bitsToWrite; + current |= value << bitsLeft; + if (bitsLeft == 0) { + writeByte(); + } + } + + // flush the left over bytes + flush(); + return packed; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/FixedBitSizes.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/FixedBitSizes.java new file mode 100644 index 0000000..242f4fb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/FixedBitSizes.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +public enum FixedBitSizes { + ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE, THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN, TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX, TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR; + + /** + * For a given fixed bit this function returns the corresponding ordinal + * @param n + * @return ordinal value + */ + public static int fixedBitsToOrdinal(int n) { + if (n == 0) { + return ONE.ordinal(); + } + + if (n >= 1 && n <= 24) { + return n - 1; + } else if (n > 24 && n <= 26) { + return TWENTYSIX.ordinal(); + } else if (n > 26 && n <= 28) { + return TWENTYEIGHT.ordinal(); + } else if (n > 28 && n <= 30) { + return THIRTY.ordinal(); + } else if (n > 30 && n <= 32) { + return THIRTYTWO.ordinal(); + } else if (n > 32 && n <= 40) { + return FORTY.ordinal(); + } else if (n > 40 && n <= 48) { + return FORTYEIGHT.ordinal(); + } else if (n > 48 && n <= 56) { + return FIFTYSIX.ordinal(); + } else { + return SIXTYFOUR.ordinal(); + } + } + + /** + * For a given ordinal this function returns the corresponding fixed bits + * @param n + * @return fixed bit value + */ + public static int ordinalToFixedBits(int n) { + + if (n >= ONE.ordinal() && n <= TWENTYFOUR.ordinal()) { + return n + 1; + } else if (n == TWENTYSIX.ordinal()) { + return 26; + } else if (n == TWENTYEIGHT.ordinal()) { + return 28; + } else if (n == THIRTY.ordinal()) { + return 30; + } else if (n == THIRTYTWO.ordinal()) { + return 32; + } else if (n == FORTY.ordinal()) { + return 40; + } else if (n == FORTYEIGHT.ordinal()) { + return 48; + } else if (n == FIFTYSIX.ordinal()) { + return 56; + } else { + return 64; + } + } + + /** + * For a given fixed bit this function will return the closest available fixed + * bit + * @param n + * @return closest valid fixed bit + */ + public static int getClosestFixedBits(int n) { + if (n == 0) { + return 1; + } + + if (n >= 1 && n <= 24) { + return n; + } else if (n > 24 && n <= 26) { + return 26; + } else if (n > 26 && n <= 28) { + return 28; + } else if (n > 28 && n <= 30) { + return 30; + } else if (n > 30 && n <= 32) { + return 32; + } else if (n > 32 && n <= 40) { + return 40; + } else if (n > 40 && n <= 48) { + return 48; + } else if (n > 48 && n <= 56) { + return 56; + } else { + return 64; + } + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerCompressionReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerCompressionReader.java new file mode 100644 index 0000000..f68d4a7 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerCompressionReader.java @@ -0,0 +1,360 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.EOFException; +import java.io.IOException; + +import org.apache.hadoop.hive.ql.io.orc.IntegerCompressionWriter.EncodingType; + +/** + * A reader that reads a sequence of light weight compressed integers. Refer + * {@link IntegerCompressionWriter} for description of various lightweight + * compression techniques. + */ +class IntegerCompressionReader { + private final InStream input; + private final boolean signed; + private final long[] literals = new long[IntegerCompressionWriter.MAX_SCOPE]; + private int numLiterals = 0; + private int used = 0; + + IntegerCompressionReader(InStream input, boolean signed) throws IOException { + this.input = input; + this.signed = signed; + } + + private void readValues() throws IOException { + // read the first 2 bits and determine the encoding type + int firstByte = input.read(); + if (firstByte < 0) { + throw new EOFException("Read past end of RLE integer from " + input); + } else { + int enc = (firstByte >>> 6) & 0x03; + if (EncodingType.SHORT_REPEAT.ordinal() == enc) { + readShortRepeatValues(firstByte); + } else if (EncodingType.DIRECT.ordinal() == enc) { + readDirectValues(firstByte); + } else if (EncodingType.PATCHED_BASE.ordinal() == enc) { + readPatchedBaseValues(firstByte); + } else { + readDeltaValues(firstByte); + } + } + } + + private void readDeltaValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fb = (firstByte >>> 1) & 0x1f; + if (fb != 0) { + fb = FixedBitSizes.ordinalToFixedBits(fb); + } + + // extract the blob run length + int len = (firstByte & 0x01) << 8; + len |= input.read(); + + // read the first value stored as vint + long firstVal = 0; + if (signed) { + firstVal = SerializationUtils.readVslong(input); + } else { + firstVal = SerializationUtils.readVulong(input); + } + + // store first value to result buffer + long prevVal = firstVal; + literals[numLiterals++] = firstVal; + + // if fixed bits is 0 then all values have fixed delta + if (fb == 0) { + // read the fixed delta value stored as vint (deltas can be negative even + // if all number are positive) + long fd = SerializationUtils.readVslong(input); + + // add fixed deltas to adjacent values + for (int i = 0; i < len; i++) { + literals[numLiterals++] = literals[numLiterals - 2] + fd; + } + } else { + // number of bytes of data blob + int bytesToRead = Utils.getTotalBytesRequired(len, fb); + + // create buffer space to fill up the blob and unpack the bit packed + // values + byte[] buffer = new byte[bytesToRead]; + for (int i = 0; i < bytesToRead; i++) { + buffer[i] = (byte) input.read(); + } + + // write the unpacked values, zigzag decode the delta values, add + // it to previous value and store final value to result buffer + BitPackReader bpr = new BitPackReader(fb); + long[] adjDeltas = bpr.unpack(buffer, len); + for (int i = 0; i < adjDeltas.length; i++) { + long zzDelta = Utils.zigzagDecode(adjDeltas[i]); + literals[numLiterals++] = prevVal + zzDelta; + prevVal = literals[numLiterals - 1]; + } + } + + } + + private void readPatchedBaseValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fbo = (firstByte >>> 1) & 0x1f; + int fb = FixedBitSizes.ordinalToFixedBits(fbo); + + // extract the run length of data blob + int len = (firstByte & 0x01) << 8; + len |= input.read(); + // runs are always one off + len += 1; + + // extract the number of bytes occupied by base + int thirdByte = input.read(); + int bw = (thirdByte >>> 5) & 0x07; + // base width is one off + bw += 1; + + // extract patch width + int pwo = thirdByte & 0x1f; + int pw = FixedBitSizes.ordinalToFixedBits(pwo); + + // read fourth byte and extract patch gap width + int fourthByte = input.read(); + int pgw = (fourthByte >>> 5) & 0x07; + // patch gap width is one off + pgw += 1; + + // extract the length of the patch list + int pl = fourthByte & 0x1f; + + // read the next base width number of bytes to extract base value + byte[] buffer = new byte[bw]; + for (int i = 0; i < bw; i++) { + buffer[i] = (byte) input.read(); + } + + long base = Utils.bytesToLongBE(buffer); + + // calculate the total bytes of data blob and patch blob + // data blob length = fixed width bits (fb) * run length (len) + // patch blob length = patch length * (patch width + patch gap width) + int blobBytes = Utils.getTotalBytesRequired(len, fb); + int patchBlobBytes = Utils.getTotalBytesRequired(pl, pw + pgw); + + // create buffer space and unpack the bit packed data blob + buffer = new byte[blobBytes]; + for (int i = 0; i < blobBytes; i++) { + buffer[i] = (byte) input.read(); + } + + // create buffer space and unpack the bit packed patch blob + byte[] patchBuffer = new byte[patchBlobBytes]; + for (int i = 0; i < patchBlobBytes; i++) { + patchBuffer[i] = (byte) input.read(); + } + + // unpack the patch blob + BitPackReader bpr = new BitPackReader(pw + pgw); + long[] unpackedPatch = bpr.unpack(patchBuffer, pl); + + // apply the patch directly when decoding the packed data + int patchIdx = 0; + long currGap = 0; + long currPatch = 0; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + long actualGap = 0; + + // special case: gap is >255 then patch value will be 0. + // if gap is <=255 then patch value cannot be 0 + while (currGap == 255 && currPatch == 0) { + actualGap += 255; + patchIdx++; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + } + // add the left over gap + actualGap += currGap; + + // unpack data blob, patch it (if required), add base and the zigzag + // decode the value to get final result + bpr = new BitPackReader(fb); + long[] unpacked = bpr.unpack(buffer, len); + for (int i = 0; i < unpacked.length; i++) { + if (i == actualGap) { + // apply patch + long patchedVal = unpacked[i] | (currPatch << fb); + + // add base to patched value and zigzag decode it + if (signed) { + literals[numLiterals++] = Utils.zigzagDecode(base + patchedVal); + } else { + literals[numLiterals++] = base + patchedVal; + } + + // increment the patch to point to next entry in patch list + patchIdx++; + + if (patchIdx < pl) { + // read the next gap and patch + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + actualGap = 0; + + // special case: gap is >255 then patch will be 0. if gap is + // <=255 then patch cannot be 0 + while (currGap == 255 && currPatch == 0) { + actualGap += 255; + patchIdx++; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + } + // add the left over gap + actualGap += currGap; + + // next gap is relative to the current gap + actualGap += i; + } + } else { + // no patching required. add base to unpacked value and zigzag + // decode to get final value + if (signed) { + literals[numLiterals++] = Utils.zigzagDecode(base + unpacked[i]); + } else { + literals[numLiterals++] = base + unpacked[i]; + } + } + } + + } + + private void readDirectValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fbo = (firstByte >>> 1) & 0x1f; + int fb = FixedBitSizes.ordinalToFixedBits(fbo); + + // extract the run length + int len = (firstByte & 0x01) << 8; + len |= input.read(); + // runs are one off + len += 1; + + // read the bit packed data blob + int bytesToRead = Utils.getTotalBytesRequired(len, fb); + + // create buffer space to fill up the blob and unpack the values + byte[] buffer = new byte[bytesToRead]; + for (int i = 0; i < bytesToRead; i++) { + buffer[i] = (byte) input.read(); + } + + // write the unpacked values and zigzag decode to result buffer + BitPackReader bpr = new BitPackReader(fb); + long[] unpacked = bpr.unpack(buffer, len); + if (signed) { + for (int i = 0; i < unpacked.length; i++) { + literals[numLiterals++] = Utils.zigzagDecode(unpacked[i]); + } + } else { + for (int i = 0; i < unpacked.length; i++) { + literals[numLiterals++] = unpacked[i]; + } + } + } + + private void readShortRepeatValues(int firstByte) throws IOException { + + // read the number of bytes occupied by the value + int size = (firstByte >>> 3) & 0x07; + // #bytes are one off + size += 1; + + // read the run length + int len = firstByte & 0x07; + // run lengths values are stored only after MIN_REPEAT value is met + len += IntegerCompressionWriter.MIN_REPEAT; + + // read the repeated value which is store using fixed bytes + byte[] buffer = new byte[size]; + for (int i = 0; i < size; i++) { + buffer[i] = (byte) input.read(); + } + + long val = Utils.bytesToLongBE(buffer); + if (signed) { + val = Utils.zigzagDecode(val); + } + + // repeat the value for length times + for (int i = 0; i < len; i++) { + literals[numLiterals++] = val; + } + } + + boolean hasNext() throws IOException { + return used != numLiterals || input.available() > 0; + } + + long next() throws IOException { + long result; + if (used == numLiterals) { + numLiterals = 0; + used = 0; + readValues(); + } + result = literals[used++]; + return result; + } + + void seek(PositionProvider index) throws IOException { + input.seek(index); + int consumed = (int) index.getNext(); + if (consumed != 0) { + // a loop is required for cases where we break the run into two + // parts + while (consumed > 0) { + numLiterals = 0; + readValues(); + used = consumed; + consumed -= numLiterals; + } + } else { + used = 0; + numLiterals = 0; + } + } + + void skip(long numValues) throws IOException { + while (numValues > 0) { + if (used == numLiterals) { + numLiterals = 0; + used = 0; + readValues(); + } + long consume = Math.min(numValues, numLiterals - used); + used += consume; + numValues -= consume; + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerCompressionWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerCompressionWriter.java new file mode 100644 index 0000000..0466f74 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerCompressionWriter.java @@ -0,0 +1,857 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.Arrays; + +/** + * A writer that performs light weight compression over sequence of integers. + *

+ * There are four types of lightweight integer compression + *

+ *

+ * The description and format for these types are as below: + *

+ * SHORT_REPEAT: Used for short repeated integer sequences. + *

+ *

+ *

+ * DIRECT: Used for random integer sequences whose number of bit requirement doesn't vary a + * lot. + *

+ *

+ *

+ * PATCHED_BASE: Used for random integer sequences whose number of bit requirement varies + * beyond a threshold. + *

+ *

+ *

+ * DELTA Used for monotonically increasing or decreasing sequences, sequences with fixed + * delta values or long repeated sequences. + *

+ *

+ */ +class IntegerCompressionWriter { + + public enum EncodingType { + SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA + } + + static final int MAX_SCOPE = 512; + static final int MIN_REPEAT = 3; + private static final int MAX_SHORT_REPEAT_LENGTH = 10; + private static final int MAX_SHORT_REPEAT_SIZE = 8; + private long prevDelta = 0; + private int fixedRunLength = 0; + private int variableRunLength = 0; + private long[] literals = new long[MAX_SCOPE]; + private final PositionedOutputStream output; + private final boolean signed; + private EncodingType encoding = null; + private int numLiterals = 0; + private long min = 0; + private long max = 0; + private long[] zigzagLiterals = null; + private long[] zzBaseReduced = null; + private long[] zzAdjDeltas = null; + private long fixedDelta = 0; + private int zzBits90p = 0; + private int zzBits100p = 0; + private int zzBRBits95p = 0; + private int zzBRBits100p = 0; + private int bitsDeltaMax = 0; + private int patchWidth = 0; + private int patchGapWidth = 0; + private int patchLength = 0; + private long[] gapVsPatchList = null; + private long zzMin = 0; + private boolean isFixedDelta = false; + + IntegerCompressionWriter(PositionedOutputStream output, boolean signed) { + this.output = output; + this.signed = signed; + } + + private void writeValues() throws IOException { + if (numLiterals != 0) { + + if (encoding.equals(EncodingType.SHORT_REPEAT)) { + writeShortRepeatValues(); + } else if (encoding.equals(EncodingType.DIRECT)) { + writeDirectValues(); + } else if (encoding.equals(EncodingType.PATCHED_BASE)) { + writePatchedBaseValues(); + } else { + writeDeltaValues(); + } + + // clear all the variables + clear(); + } + } + + private void writeDeltaValues() throws IOException { + // write encoding type in top 2 bits + int enc = encoding.ordinal() << 6; + + int len = 0; + int fb = bitsDeltaMax; + int cfb = 0; + int fbo = 0; + + if (isFixedDelta) { + // if the fixed delta is 0 then the sequence is counted as fixed + // run length else as variable run length + if (fixedRunLength > MIN_REPEAT) { + // ex. sequence: 2 2 2 2 2 2 2 2 + len = fixedRunLength - 1; + fixedRunLength = 0; + } else { + // ex. sequence: 4 6 8 10 12 14 16 + len = variableRunLength - 1; + variableRunLength = 0; + } + } else { + cfb = FixedBitSizes.getClosestFixedBits(fb); + // fixed width 0 is used for fixed delta runs. sequence that incur + // only 1 bit to encode will have an additional bit + if (cfb == 1) { + cfb = 2; + } + fbo = FixedBitSizes.fixedBitsToOrdinal(cfb) << 1; + len = variableRunLength - 1; + variableRunLength = 0; + } + + // extract the 9th bit of run length + int tailBits = (len & 0x100) >>> 8; + + // create first byte of the header + int headerFirstByte = enc | fbo | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + int headerSecondByte = len & 0xff; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + + // store the first value from zigzag literal array + if (signed) { + SerializationUtils.writeVslong(output, literals[0]); + } else { + SerializationUtils.writeVulong(output, literals[0]); + } + + // if delta is fixed then we don't need to store delta blob else store + // delta blob using fixed bit packing + if (isFixedDelta) { + // store the fixed delta using zigzag encoding. deltas can be negative + // even if all values are positive + SerializationUtils.writeVslong(output, fixedDelta); + } else { + // bit pack the delta blob + BitPackWriter bpw = new BitPackWriter(cfb); + byte[] packed = bpw.pack(zzAdjDeltas, zzAdjDeltas.length); + for (int i = 0; i < packed.length; i++) { + output.write(packed[i]); + } + } + } + + private void writePatchedBaseValues() throws IOException { + // write encoding type in top 2 bits + int enc = encoding.ordinal() << 6; + + // write the number of fixed bits required in next 5 bits + int fb = zzBRBits95p; + int fbc = FixedBitSizes.getClosestFixedBits(fb); + int fbo = FixedBitSizes.fixedBitsToOrdinal(fbc) << 1; + + // adjust variable run length, they are one off + variableRunLength -= 1; + + // extract the 9th bit of run length + int tailBits = (variableRunLength & 0x100) >>> 8; + + // create first byte of the header + int headerFirstByte = enc | fbo | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + int headerSecondByte = variableRunLength & 0xff; + + // find the number of bytes required for base and shift it by 5 bits + // to accommodate patch width + int baseWidth = Utils.findNumBits(zzMin); + baseWidth = FixedBitSizes.getClosestFixedBits(baseWidth); + int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1; + int bb = (baseBytes - 1) << 5; + + // third byte contains 3 bits for number of bytes occupied by base + // and 5 bits for patchWidth + int headerThirdByte = bb | FixedBitSizes.fixedBitsToOrdinal(patchWidth); + + // fourth byte contains 3 bits for page gap width and 5 bits for + // patch length + int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + output.write(headerThirdByte); + output.write(headerFourthByte); + + // write the base value using fixed bytes in big endian order + for (int i = baseBytes - 1; i >= 0; i--) { + byte b = (byte) ((zzMin >>> (i * 8)) & 0xff); + output.write(b); + } + + // bit packing the delta values and write each bytes + int closestFixedBits = FixedBitSizes.getClosestFixedBits(zzBRBits95p); + BitPackWriter bpw = new BitPackWriter(closestFixedBits); + byte[] packed = bpw.pack(zzBaseReduced, numLiterals); + for (int i = 0; i < packed.length; i++) { + output.write(packed[i]); + } + + // write the patch blob + closestFixedBits = FixedBitSizes.getClosestFixedBits(patchGapWidth + + patchWidth); + bpw = new BitPackWriter(closestFixedBits); + packed = bpw.pack(gapVsPatchList, patchLength); + for (int i = 0; i < packed.length; i++) { + output.write(packed[i]); + } + + // reset run length + variableRunLength = 0; + } + + private void writeDirectValues() throws IOException { + // write encoding type in top 2 bits + int enc = encoding.ordinal() << 6; + + // write the number of fixed bits required in next 5 bits + int cfb = zzBits100p; + int fbo = FixedBitSizes.fixedBitsToOrdinal(cfb) << 1; + + // adjust variable run length + variableRunLength -= 1; + + // extract the 9th bit of run length + int tailBits = (variableRunLength & 0x100) >>> 8; + + // create first byte of the header + int headerFirstByte = enc | fbo | tailBits; + + // second byte of the header stores the remaining 8 bits of + // runlength + int headerSecondByte = variableRunLength & 0xff; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + + // bit packing the delta values and write each bytes + BitPackWriter bpw = new BitPackWriter(cfb); + byte[] packed = bpw.pack(zigzagLiterals, zigzagLiterals.length); + for (int i = 0; i < packed.length; i++) { + output.write(packed[i]); + } + + // reset run length + variableRunLength = 0; + } + + private void writeShortRepeatValues() throws IOException { + // System.out.println("SHORT_REPEAT: " + numLiterals + " Literals: " + // + Longs.asList(Arrays.copyOf(literals, numLiterals))); + + // get the value that is repeating, compute the bits and bytes required + long repeatVal = 0; + if (signed) { + repeatVal = Utils.zigzagEncode(literals[0]); + } else { + repeatVal = literals[0]; + } + + int numBitsRepeatVal = Utils.findNumBits(repeatVal); + numBitsRepeatVal = FixedBitSizes.getClosestFixedBits(numBitsRepeatVal); + int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3 + : (numBitsRepeatVal >>> 3) + 1; + + // if the runs are long or too short and if the delta is non zero, then + // choose a different algorithm + if (fixedRunLength >= MIN_REPEAT + && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH + && numBytesRepeatVal <= MAX_SHORT_REPEAT_SIZE && prevDelta == 0) { + // write encoding type in top 2 bits + int header = encoding.ordinal() << 6; + + // write the number of bytes required for the value + header |= ((numBytesRepeatVal - 1) << 3); + + // write the run length + fixedRunLength -= MIN_REPEAT; + header |= fixedRunLength; + + // write the header + output.write(header); + + // write the payload (i.e. the repeat value) in big endian + for (int i = numBytesRepeatVal - 1; i >= 0; i--) { + int b = (int) ((repeatVal >>> (i * 8)) & 0xff); + output.write(b); + } + + fixedRunLength = 0; + } else { + determineEncoding(); + writeValues(); + } + } + + private void determineEncoding() { + // used for direct encoding + zigzagLiterals = new long[numLiterals]; + + // used for patched base encoding + zzBaseReduced = new long[numLiterals]; + + // used for delta encoding + zzAdjDeltas = new long[numLiterals - 1]; + + int idx = 0; + + // for identifying monotonic sequences + boolean isIncreasing = false; + int increasingCount = 1; + boolean isDecreasing = false; + int decreasingCount = 1; + + // for identifying type of delta encoding + long currMin = literals[0]; + long currMax = literals[0]; + isFixedDelta = true; + long currDelta = 0; + + if (signed) { + zzMin = Utils.zigzagEncode(literals[0]); + } else { + zzMin = literals[0]; + } + long deltaMax = 0; + + // populate all variables to identify the encoding type + if (numLiterals >= 1) { + currDelta = literals[1] - literals[0]; + for (int i = 0; i < numLiterals; i++) { + if (i > 0 && literals[i] >= currMax) { + currMax = literals[i]; + increasingCount++; + } + + if (i > 0 && literals[i] <= currMin) { + currMin = literals[i]; + decreasingCount++; + } + + // if delta doesn't changes then mark it as fixed delta + if (i > 0 && isFixedDelta) { + if (literals[i] - literals[i - 1] != currDelta) { + isFixedDelta = false; + } + + fixedDelta = currDelta; + } + + // store the minimum value among zigzag encoded values. The min + // value (base) will be removed in patched base encoding + long zzEncVal = 0; + if (signed) { + zzEncVal = Utils.zigzagEncode(literals[i]); + } else { + zzEncVal = literals[i]; + } + if (zzEncVal < zzMin) { + zzMin = zzEncVal; + } + zigzagLiterals[idx] = zzEncVal; + idx++; + + // max delta value is required for computing the fixed bits + // required for delta blob in delta encoding + if (i > 0) { + zzAdjDeltas[i - 1] = Utils.zigzagEncode(literals[i] - literals[i - 1]); + if (zzAdjDeltas[i - 1] > deltaMax) { + deltaMax = zzAdjDeltas[i - 1]; + } + } + } + + // stores the number of bits required for packing delta blob in + // delta encoding + bitsDeltaMax = Utils.findNumBits(deltaMax); + + // if decreasing count equals total number of literals then the + // sequence is monotonically decreasing + if (increasingCount == 1 && decreasingCount == numLiterals) { + isDecreasing = true; + } + + // if increasing count equals total number of literals then the + // sequence is monotonically increasing + if (decreasingCount == 1 && increasingCount == numLiterals) { + isIncreasing = true; + } + } + + // if the sequence is both increasing and decreasing then it is not + // monotonic + if (isDecreasing && isIncreasing) { + isDecreasing = false; + isIncreasing = false; + } + + // percentile values are computed for the zigzag encoded values. if the + // number of bit requirement between 90th and 100th percentile varies + // beyond a threshold then we need to patch the values. if the variation + // is not significant then we can use direct or delta encoding + + // percentile is called multiple times and so we will sort the array + // once and reuse it + long[] sortedZigzag = Arrays.copyOf(zigzagLiterals, zigzagLiterals.length); + Arrays.sort(sortedZigzag); + double p = 0.9; + long p90Val = (long) Utils.percentile(sortedZigzag, p, true); + zzBits90p = Utils.findNumBits(p90Val); + zzBits90p = FixedBitSizes.getClosestFixedBits(zzBits90p); + + p = 1.0; + long p100Val = (long) Utils.percentile(sortedZigzag, p, true); + zzBits100p = Utils.findNumBits(p100Val); + zzBits100p = FixedBitSizes.getClosestFixedBits(zzBits100p); + + int diffBitsLH = zzBits100p - zzBits90p; + + // if the difference between 90th percentile and 100th percentile fixed + // bits is > 1 then we need patch the values + if (isIncreasing == false && isDecreasing == false && diffBitsLH > 1 + && isFixedDelta == false) { + // patching is done only on base reduces values. + // remove base from the zigzag encoded values + for (int i = 0; i < zigzagLiterals.length; i++) { + zzBaseReduced[i] = zigzagLiterals[i] - zzMin; + } + + // percentile is computed multiple times, so sort and reuse + long[] sortedZZBaseReduced = Arrays.copyOf(zzBaseReduced, + zzBaseReduced.length); + Arrays.sort(sortedZZBaseReduced); + + // 95th percentile width is used to determine max allowed value + // after which patching will be done + p = 0.95; + long p95Val = (long) Utils.percentile(sortedZZBaseReduced, p, true); + zzBRBits95p = Utils.findNumBits(p95Val); + zzBRBits95p = FixedBitSizes.getClosestFixedBits(zzBRBits95p); + + // 100th percentile is used to compute the max patch width + p = 1.0; + p100Val = (long) Utils.percentile(sortedZZBaseReduced, p, true); + zzBRBits100p = Utils.findNumBits(p100Val); + zzBRBits100p = FixedBitSizes.getClosestFixedBits(zzBRBits100p); + + // after base reducing the values, if the difference in bits between + // 95th percentile and 100th percentile value is zero then there + // is no point in patching the values, in which case we will + // fallback to DIRECT encoding. The reason for this is max value + // (beyond which patching is done) is based on 95th percentile + // base reduced value. + if ((zzBRBits100p - zzBRBits95p) != 0) { + encoding = EncodingType.PATCHED_BASE; + preparePatchedBlob(); + return; + } else { + encoding = EncodingType.DIRECT; + return; + } + } + + // if difference in bits between 95th percentile and 100th percentile is + // 0, then patch length will become 0. Hence we will fallback to direct + if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1 + && isFixedDelta == false) { + encoding = EncodingType.DIRECT; + return; + } + + if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1 + && isFixedDelta == true) { + encoding = EncodingType.DELTA; + return; + } + + if (isIncreasing || isDecreasing) { + encoding = EncodingType.DELTA; + return; + } + + if (encoding == null) { + throw new RuntimeException("Integer encoding cannot be determined."); + } + } + + private void preparePatchedBlob() { + // mask will be max value beyond which patch will be generated + int mask = (1 << zzBRBits95p) - 1; + + // since we are considering only 95 percentile, the size of gap and + // patch array can contain only be 5% values + patchLength = (int) Math.ceil((zzBaseReduced.length * 0.05)); + int[] gapList = new int[patchLength]; + long[] patchList = new long[patchLength]; + + // #bit for patch + patchWidth = zzBRBits100p - zzBRBits95p; + patchWidth = FixedBitSizes.getClosestFixedBits(patchWidth); + + int gapIdx = 0; + int patchIdx = 0; + int prev = 0; + int gap = 0; + int maxGap = 0; + + for (int i = 0; i < zzBaseReduced.length; i++) { + // if value is above mask then create the patch and record the gap + if (zzBaseReduced[i] > mask) { + gap = i - prev; + if (gap > maxGap) { + maxGap = gap; + } + + // gaps are relative, so store the previous patched value + prev = i; + gapList[gapIdx++] = gap; + + // extract the most significant bits that are over mask + long patch = zzBaseReduced[i] >>> zzBRBits95p; + patchList[patchIdx++] = patch; + + // strip off the MSB to enable safe bit packing + zzBaseReduced[i] &= mask; + } + } + + // adjust the patch length to number of entries in gap list + patchLength = gapIdx; + + // if the element to be patched is the first and only element then + // max gap will be 0, but to store the gap as 0 we need atleast 1 bit + if (maxGap == 0 && patchLength != 0) { + patchGapWidth = 1; + } else { + patchGapWidth = Utils.findNumBits(maxGap); + } + + // special case: if the patch gap width is greater than 256, then + // we need 9 bits to encode the gap width. But we only have 3 bits in + // header to record the gap width. To deal with this case, we will save + // two entries in final patch list with following entries + // 256 gap width => 0 for patch value + // actual gap - 256 => actual patch value + if (patchGapWidth > 8) { + patchGapWidth = 8; + // for gap = 511, we need two additional entries in patch list + if (maxGap == 511) { + patchLength += 2; + } else { + patchLength += 1; + } + } + + // create gap vs patch list + gapIdx = 0; + patchIdx = 0; + gapVsPatchList = new long[patchLength]; + for (int i = 0; i < patchLength; i++) { + long g = gapList[gapIdx++]; + long p = patchList[patchIdx++]; + while (g > 255) { + gapVsPatchList[i++] = (255 << patchWidth) | 0; + g -= 255; + } + + // store patch value in LSBs and gap in MSBs + gapVsPatchList[i] = (g << patchWidth) | p; + } + } + + /** + * clears all the variables + */ + private void clear() { + numLiterals = 0; + min = 0; + max = 0; + encoding = null; + prevDelta = 0; + zigzagLiterals = null; + zzBaseReduced = null; + zzAdjDeltas = null; + fixedDelta = 0; + zzBits90p = 0; + zzBits100p = 0; + zzBRBits95p = 0; + zzBRBits100p = 0; + bitsDeltaMax = 0; + patchGapWidth = 0; + patchLength = 0; + patchWidth = 0; + gapVsPatchList = null; + zzMin = 0; + isFixedDelta = false; + } + + void flush() throws IOException { + // if only one element is left in buffer then use short repeat encoding + if (numLiterals == 1) { + encoding = EncodingType.SHORT_REPEAT; + fixedRunLength = 1; + writeValues(); + } + + // if variable runs are not 0 then determine the delta encoding to use + // and flush out the buffer + if (variableRunLength != 0 && numLiterals != 0) { + determineEncoding(); + writeValues(); + } + + // if fixed runs are not 0 then flush out the buffer + if (fixedRunLength != 0 && numLiterals != 0) { + // encoding = EncodingType.SHORT_REPEAT; + if (fixedRunLength < MIN_REPEAT) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + determineEncoding(); + writeValues(); + } else { + encoding = EncodingType.SHORT_REPEAT; + writeValues(); + } + } + + output.flush(); + } + + void write(long val) throws IOException { + if (numLiterals == 0) { + initializeLiterals(val); + } else { + // update min and max values + if (val <= min) { + min = val; + } + + if (val >= max) { + max = val; + } + + if (numLiterals == 1) { + prevDelta = val - literals[0]; + literals[numLiterals++] = val; + // if both values are same count as fixed run else variable run + if (val == literals[0]) { + fixedRunLength = 2; + variableRunLength = 0; + } else { + fixedRunLength = 0; + variableRunLength = 2; + } + } else { + long currentDelta = val - literals[numLiterals - 1]; + if (prevDelta == 0 && currentDelta == 0) { + // fixed delta run + + literals[numLiterals++] = val; + + // if variable run is non-zero then we are seeing repeating + // values at the end of variable run in which case keep + // updating variable and fixed runs + if (variableRunLength > 0) { + fixedRunLength = 2; + } + fixedRunLength += 1; + + // if fixed run met the minimum condition and if variable + // run is non-zero then flush the variable run and shift the + // tail fixed runs to start of the buffer + if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) { + numLiterals -= MIN_REPEAT; + variableRunLength -= MIN_REPEAT - 1; + // copy the tail fixed runs + long[] tailVals = Arrays.copyOfRange(literals, numLiterals, + numLiterals + MIN_REPEAT); + + // determine variable encoding and flush values + determineEncoding(); + writeValues(); + + // shift tail fixed runs to beginning of the buffer + for (long l : tailVals) { + literals[numLiterals++] = l; + } + } + + // if fixed runs reached max repeat length then write values + if (fixedRunLength == MAX_SCOPE) { + determineEncoding(); + writeValues(); + } + } else { + // variable delta run + + // if fixed run length is non-zero and if satisfies the + // short repeat algorithm conditions then write the values + // as short repeats else determine the appropriate algorithm + // to persist the values + if (fixedRunLength >= MIN_REPEAT) { + if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { + encoding = EncodingType.SHORT_REPEAT; + writeValues(); + } else { + determineEncoding(); + writeValues(); + } + } + + // if fixed run length is 0 && fixedRunLength < MIN_REPEAT) { + if (val != literals[numLiterals - 1]) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + } + } + + // after writing values re-initialize the variables + if (numLiterals == 0) { + initializeLiterals(val); + } else { + // keep updating variable run lengths + prevDelta = val - literals[numLiterals - 1]; + literals[numLiterals++] = val; + variableRunLength += 1; + + // if variable run length reach the max scope, write it + if (variableRunLength == MAX_SCOPE) { + determineEncoding(); + writeValues(); + } + } + } + } + } + } + + private void initializeLiterals(long val) { + literals[numLiterals++] = val; + min = val; + max = val; + fixedRunLength = 1; + variableRunLength = 1; + } + + void getPosition(PositionRecorder recorder) throws IOException { + output.getPosition(recorder); + recorder.addPosition(numLiterals); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 06103e3..3278f92 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -114,14 +114,18 @@ public long getNext() { protected final int columnId; private BitFieldReader present = null; protected boolean valuePresent = false; + protected boolean isDirectV2; TreeReader(Path path, int columnId) { this.path = path; this.columnId = columnId; + // FIXME: make it user configurable? + this.isDirectV2 = true; } void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { - if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { throw new IOException("Unknown encoding " + encoding + " in column " + columnId + " of " + path); } @@ -130,6 +134,9 @@ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { void startStripe(Map streams, List encoding ) throws IOException { + if(encoding.get(columnId).getKind() == OrcProto.ColumnEncoding.Kind.DIRECT) { + this.isDirectV2 = false; + } checkEncoding(encoding.get(columnId)); InStream in = streams.get(new StreamName(columnId, OrcProto.Stream.Kind.PRESENT)); @@ -264,6 +271,7 @@ void skipRows(long items) throws IOException { private static class ShortTreeReader extends TreeReader{ private RunLengthIntegerReader reader = null; + private IntegerCompressionReader readerV2 = null; ShortTreeReader(Path path, int columnId) { super(path, columnId); @@ -276,13 +284,21 @@ void startStripe(Map streams, super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + if (super.isDirectV2) { + readerV2 = new IntegerCompressionReader(streams.get(name), true); + } else { + reader = new RunLengthIntegerReader(streams.get(name), true); + } } @Override void seek(PositionProvider[] index) throws IOException { super.seek(index); - reader.seek(index[columnId]); + if (reader != null) { + reader.seek(index[columnId]); + } else { + readerV2.seek(index[columnId]); + } } @Override @@ -295,19 +311,28 @@ Object next(Object previous) throws IOException { } else { result = (ShortWritable) previous; } - result.set((short) reader.next()); + if (reader != null) { + result.set((short) reader.next()); + } else { + result.set((short) readerV2.next()); + } } return result; } @Override void skipRows(long items) throws IOException { - reader.skip(countNonNulls(items)); + if (reader != null) { + reader.skip(countNonNulls(items)); + } else { + readerV2.skip(countNonNulls(items)); + } } } private static class IntTreeReader extends TreeReader{ private RunLengthIntegerReader reader = null; + private IntegerCompressionReader readerV2 = null; IntTreeReader(Path path, int columnId) { super(path, columnId); @@ -320,13 +345,21 @@ void startStripe(Map streams, super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + if (isDirectV2) { + readerV2 = new IntegerCompressionReader(streams.get(name), true); + } else { + reader = new RunLengthIntegerReader(streams.get(name), true); + } } @Override void seek(PositionProvider[] index) throws IOException { super.seek(index); - reader.seek(index[columnId]); + if (reader != null) { + reader.seek(index[columnId]); + } else { + readerV2.seek(index[columnId]); + } } @Override @@ -339,19 +372,28 @@ Object next(Object previous) throws IOException { } else { result = (IntWritable) previous; } - result.set((int) reader.next()); + if (reader != null) { + result.set((int) reader.next()); + } else { + result.set((int) readerV2.next()); + } } return result; } @Override void skipRows(long items) throws IOException { - reader.skip(countNonNulls(items)); + if (reader != null) { + reader.skip(countNonNulls(items)); + } else { + readerV2.skip(countNonNulls(items)); + } } } private static class LongTreeReader extends TreeReader{ private RunLengthIntegerReader reader = null; + private IntegerCompressionReader readerV2 = null; LongTreeReader(Path path, int columnId) { super(path, columnId); @@ -364,13 +406,21 @@ void startStripe(Map streams, super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + if (isDirectV2) { + readerV2 = new IntegerCompressionReader(streams.get(name), true); + } else { + reader = new RunLengthIntegerReader(streams.get(name), true); + } } @Override void seek(PositionProvider[] index) throws IOException { super.seek(index); - reader.seek(index[columnId]); + if (reader != null) { + reader.seek(index[columnId]); + } else { + readerV2.seek(index[columnId]); + } } @Override @@ -383,14 +433,22 @@ Object next(Object previous) throws IOException { } else { result = (LongWritable) previous; } - result.set(reader.next()); + if (reader != null) { + result.set(reader.next()); + } else { + result.set(readerV2.next()); + } } return result; } @Override void skipRows(long items) throws IOException { - reader.skip(countNonNulls(items)); + if (reader != null) { + reader.skip(countNonNulls(items)); + } else { + readerV2.skip(countNonNulls(items)); + } } } @@ -489,7 +547,8 @@ void skipRows(long items) throws IOException { private static class BinaryTreeReader extends TreeReader{ private InStream stream; - private RunLengthIntegerReader lengths; + private RunLengthIntegerReader lengths = null; + private IntegerCompressionReader lengthsV2 = null; BinaryTreeReader(Path path, int columnId) { super(path, columnId); @@ -503,16 +562,26 @@ void startStripe(Map streams, StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); stream = streams.get(name); - lengths = new RunLengthIntegerReader(streams.get(new - StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), - false); + if (isDirectV2) { + lengthsV2 = new IntegerCompressionReader(streams.get(new + StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), + false); + } else { + lengths = new RunLengthIntegerReader(streams.get(new + StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), + false); + } } @Override void seek(PositionProvider[] index) throws IOException { super.seek(index); stream.seek(index[columnId]); - lengths.seek(index[columnId]); + if (lengths != null) { + lengths.seek(index[columnId]); + } else { + lengthsV2.seek(index[columnId]); + } } @Override @@ -525,7 +594,12 @@ Object next(Object previous) throws IOException { } else { result = (BytesWritable) previous; } - int len = (int) lengths.next(); + int len = 0; + if (lengths != null) { + len = (int) lengths.next(); + } else { + len = (int) lengthsV2.next(); + } result.setSize(len); int offset = 0; while (len > 0) { @@ -544,16 +618,24 @@ Object next(Object previous) throws IOException { void skipRows(long items) throws IOException { items = countNonNulls(items); long lengthToSkip = 0; - for(int i=0; i < items; ++i) { - lengthToSkip += lengths.next(); + if (lengths != null) { + for (int i = 0; i < items; ++i) { + lengthToSkip += lengths.next(); + } + } else { + for (int i = 0; i < items; ++i) { + lengthToSkip += lengthsV2.next(); + } } stream.skip(lengthToSkip); } } private static class TimestampTreeReader extends TreeReader{ - private RunLengthIntegerReader data; - private RunLengthIntegerReader nanos; + private RunLengthIntegerReader data = null; + private RunLengthIntegerReader nanos = null; + private IntegerCompressionReader dataV2 = null; + private IntegerCompressionReader nanosV2 = null; TimestampTreeReader(Path path, int columnId) { super(path, columnId); @@ -564,17 +646,32 @@ void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - data = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.DATA)), true); - nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.SECONDARY)), false); + if (isDirectV2) { + dataV2 = new IntegerCompressionReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.DATA)), true); + nanosV2 = new IntegerCompressionReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.SECONDARY)), false); + } else { + data = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.DATA)), true); + nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.SECONDARY)), false); + } } @Override void seek(PositionProvider[] index) throws IOException { super.seek(index); - data.seek(index[columnId]); - nanos.seek(index[columnId]); + if (data != null) { + data.seek(index[columnId]); + } else { + dataV2.seek(index[columnId]); + } + if (nanos != null) { + nanos.seek(index[columnId]); + } else { + nanosV2.seek(index[columnId]); + } } @Override @@ -587,9 +684,21 @@ Object next(Object previous) throws IOException { } else { result = (Timestamp) previous; } - long millis = (data.next() + WriterImpl.BASE_TIMESTAMP) * - WriterImpl.MILLIS_PER_SECOND; - int newNanos = parseNanos(nanos.next()); + long millis = 0; + int newNanos = 0; + if (data != null) { + millis = (data.next() + WriterImpl.BASE_TIMESTAMP) * + WriterImpl.MILLIS_PER_SECOND; + } else { + millis = (dataV2.next() + WriterImpl.BASE_TIMESTAMP) * + WriterImpl.MILLIS_PER_SECOND; + } + + if (nanos != null) { + newNanos = parseNanos(nanos.next()); + } else { + newNanos = parseNanos(nanosV2.next()); + } // fix the rounding when we divided by 1000. if (millis >= 0) { millis += newNanos / 1000000; @@ -616,14 +725,23 @@ private static int parseNanos(long serialized) { @Override void skipRows(long items) throws IOException { items = countNonNulls(items); - data.skip(items); - nanos.skip(items); + if (data != null) { + data.skip(items); + } else { + dataV2.skip(items); + } + if (nanos != null) { + nanos.skip(items); + } else { + nanosV2.skip(items); + } } } private static class DecimalTreeReader extends TreeReader{ private InStream valueStream; - private RunLengthIntegerReader scaleStream; + private RunLengthIntegerReader scaleStream = null; + private IntegerCompressionReader scaleStreamV2 = null; DecimalTreeReader(Path path, int columnId) { super(path, columnId); @@ -636,23 +754,37 @@ void startStripe(Map streams, super.startStripe(streams, encodings); valueStream = streams.get(new StreamName(columnId, OrcProto.Stream.Kind.DATA)); - scaleStream = new RunLengthIntegerReader(streams.get( - new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true); + if (isDirectV2) { + scaleStreamV2 = new IntegerCompressionReader(streams.get( + new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true); + } else { + scaleStream = new RunLengthIntegerReader(streams.get( + new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true); + } } @Override void seek(PositionProvider[] index) throws IOException { super.seek(index); valueStream.seek(index[columnId]); - scaleStream.seek(index[columnId]); + if (scaleStream != null) { + scaleStream.seek(index[columnId]); + } else { + scaleStreamV2.seek(index[columnId]); + } } @Override Object next(Object previous) throws IOException { super.next(previous); if (valuePresent) { - return new HiveDecimal(SerializationUtils.readBigInteger(valueStream), - (int) scaleStream.next()); + if (scaleStream != null) { + return new HiveDecimal(SerializationUtils.readBigInteger(valueStream), + (int) scaleStream.next()); + } else { + return new HiveDecimal(SerializationUtils.readBigInteger(valueStream), + (int) scaleStreamV2.next()); + } } return null; } @@ -663,7 +795,11 @@ void skipRows(long items) throws IOException { for(int i=0; i < items; i++) { SerializationUtils.readBigInteger(valueStream); } - scaleStream.skip(items); + if (scaleStream != null) { + scaleStream.skip(items); + } else { + scaleStreamV2.skip(items); + } } } @@ -671,7 +807,8 @@ void skipRows(long items) throws IOException { private DynamicByteArray dictionaryBuffer = null; private int dictionarySize; private int[] dictionaryOffsets; - private RunLengthIntegerReader reader; + private RunLengthIntegerReader reader = null; + private IntegerCompressionReader readerV2 = null; StringTreeReader(Path path, int columnId) { super(path, columnId); @@ -707,27 +844,44 @@ void startStripe(Map streams, name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH); in = streams.get(name); RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false); + IntegerCompressionReader lenReaderV2 = new IntegerCompressionReader(in, false); int offset = 0; if (dictionaryOffsets == null || dictionaryOffsets.length < dictionarySize + 1) { dictionaryOffsets = new int[dictionarySize + 1]; } - for(int i=0; i < dictionarySize; ++i) { - dictionaryOffsets[i] = offset; - offset += (int) lenReader.next(); + // for performance reasons checking it outside of loop + if (isDirectV2) { + for (int i = 0; i < dictionarySize; ++i) { + dictionaryOffsets[i] = offset; + offset += (int) lenReaderV2.next(); + } + } else { + for (int i = 0; i < dictionarySize; ++i) { + dictionaryOffsets[i] = offset; + offset += (int) lenReader.next(); + } } dictionaryOffsets[dictionarySize] = offset; in.close(); // set up the row reader name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), false); + if (isDirectV2) { + readerV2 = new IntegerCompressionReader(streams.get(name), false); + } else { + reader = new RunLengthIntegerReader(streams.get(name), false); + } } @Override void seek(PositionProvider[] index) throws IOException { super.seek(index); - reader.seek(index[columnId]); + if (reader != null) { + reader.seek(index[columnId]); + } else { + readerV2.seek(index[columnId]); + } } @Override @@ -735,7 +889,12 @@ Object next(Object previous) throws IOException { super.next(previous); Text result = null; if (valuePresent) { - int entry = (int) reader.next(); + int entry = 0; + if (reader != null) { + entry = (int) reader.next(); + } else { + entry = (int) readerV2.next(); + } if (previous == null) { result = new Text(); } else { @@ -764,7 +923,11 @@ Object next(Object previous) throws IOException { @Override void skipRows(long items) throws IOException { - reader.skip(countNonNulls(items)); + if (reader != null) { + reader.skip(countNonNulls(items)); + } else { + readerV2.skip(countNonNulls(items)); + } } } @@ -919,7 +1082,8 @@ void skipRows(long items) throws IOException { private static class ListTreeReader extends TreeReader { private final TreeReader elementReader; - private RunLengthIntegerReader lengths; + private RunLengthIntegerReader lengths = null; + private IntegerCompressionReader lengthsV2 = null; ListTreeReader(Path path, int columnId, List types, @@ -933,7 +1097,11 @@ void skipRows(long items) throws IOException { @Override void seek(PositionProvider[] index) throws IOException { super.seek(index); - lengths.seek(index[columnId]); + if (lengths != null) { + lengths.seek(index[columnId]); + } else { + lengthsV2.seek(index[columnId]); + } elementReader.seek(index); } @@ -949,7 +1117,12 @@ Object next(Object previous) throws IOException { result = (ArrayList) previous; } int prevLength = result.size(); - int length = (int) lengths.next(); + int length = 0; + if (lengths != null) { + length = (int) lengths.next(); + } else { + length = (int) lengthsV2.next(); + } // extend the list to the new length for(int i=prevLength; i < length; ++i) { result.add(null); @@ -972,8 +1145,13 @@ void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.LENGTH)), false); + if (isDirectV2) { + lengthsV2 = new IntegerCompressionReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); + } else { + lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); + } if (elementReader != null) { elementReader.startStripe(streams, encodings); } @@ -983,8 +1161,14 @@ void startStripe(Map streams, void skipRows(long items) throws IOException { items = countNonNulls(items); long childSkip = 0; - for(long i=0; i < items; ++i) { - childSkip += lengths.next(); + if (lengths != null) { + for (long i = 0; i < items; ++i) { + childSkip += lengths.next(); + } + } else { + for (long i = 0; i < items; ++i) { + childSkip += lengthsV2.next(); + } } elementReader.skipRows(childSkip); } @@ -993,7 +1177,8 @@ void skipRows(long items) throws IOException { private static class MapTreeReader extends TreeReader { private final TreeReader keyReader; private final TreeReader valueReader; - private RunLengthIntegerReader lengths; + private RunLengthIntegerReader lengths = null; + private IntegerCompressionReader lengthsV2 = null; MapTreeReader(Path path, int columnId, @@ -1018,7 +1203,11 @@ void skipRows(long items) throws IOException { @Override void seek(PositionProvider[] index) throws IOException { super.seek(index); - lengths.seek(index[columnId]); + if (lengths != null) { + lengths.seek(index[columnId]); + } else { + lengthsV2.seek(index[columnId]); + } keyReader.seek(index); valueReader.seek(index); } @@ -1036,7 +1225,12 @@ Object next(Object previous) throws IOException { } // for now just clear and create new objects result.clear(); - int length = (int) lengths.next(); + int length = 0; + if (lengths != null) { + length = (int) lengths.next(); + } else { + length = (int) lengthsV2.next(); + } // read the new elements into the array for(int i=0; i< length; i++) { result.put(keyReader.next(null), valueReader.next(null)); @@ -1050,8 +1244,13 @@ void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.LENGTH)), false); + if (isDirectV2) { + lengthsV2 = new IntegerCompressionReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); + } else { + lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); + } if (keyReader != null) { keyReader.startStripe(streams, encodings); } @@ -1064,8 +1263,14 @@ void startStripe(Map streams, void skipRows(long items) throws IOException { items = countNonNulls(items); long childSkip = 0; - for(long i=0; i < items; ++i) { - childSkip += lengths.next(); + if (lengths != null) { + for (long i = 0; i < items; ++i) { + childSkip += lengths.next(); + } + } else { + for (long i = 0; i < items; ++i) { + childSkip += lengthsV2.next(); + } } keyReader.skipRows(childSkip); valueReader.skipRows(childSkip); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/Utils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/Utils.java new file mode 100644 index 0000000..08d4f73 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/Utils.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.util.Arrays; +import java.util.Random; + +final class Utils { + + // unused + private Utils() { + } + + /** + * Count the number of bits required to encode the given value + * + * @param value + * @return bits required to store value + */ + static int findNumBits(long value) { + int count = 0; + while (value > 0) { + count++; + value = value >>> 1; + } + return count; + } + + /** + * zigzag encode the given value + * + * @param val + * @return zigzag encoded value + */ + static long zigzagEncode(long val) { + return (val << 1) ^ (val >> 63); + } + + /** + * zigzag decode the given value + * + * @param val + * @return zizag decoded value + */ + static long zigzagDecode(long val) { + return (val >>> 1) ^ -(val & 1); + } + + /** + * Next random long value + * + * @param rng + * @param n + * @return random long value + */ + static long nextLong(Random rng, long n) { + long bits, val; + do { + bits = (rng.nextLong() << 1) >>> 1; + val = bits % n; + } while (bits - val + (n - 1) < 0L); + return val; + } + + /** + * Compute the specified percentile value for an array + * + * @param data + * - array + * @param p + * - percentile value (>=0.0 to <=1.0) + * @param sorted + * - if the array is sorted or not + * @return percentile value + */ + static double percentile(long[] data, double p, boolean sorted) { + if ((p > 1.0) || (p <= 0.0)) { + throw new IllegalArgumentException("invalid percentile value: " + p); + } + + long[] input = data; + + if (sorted == false) { + input = Arrays.copyOf(data, data.length); + Arrays.sort(input); + } + + int n = input.length; + int idx = (int) Math.floor((n + 1) * p); + if (idx >= n) { + return input[n - 1]; + } + + if (idx < 1) { + return input[0]; + } + long lower = input[idx - 1]; + long upper = input[idx]; + double diff = ((n + 1) * p) - idx; + return lower + diff * (upper - lower); + } + + /** + * Convert byte array to long + * + * @param b + * - byte array + * @return long value + */ + static long bytesToLongBE(byte[] b) { + long out = 0; + + int offset = b.length - 1; + for (int i = 0; i < b.length; i++) { + long val = 0xff & b[i]; + out |= (val << (offset * 8)); + offset -= 1; + } + + return out; + } + + /** + * Calculate the number of bytes required + * + * @param n + * - number of values + * @param numBits + * - bit width + * @return number of bytes required + */ + static int getTotalBytesRequired(int n, int numBits) { + return (int) Math.ceil((double) (n * numBits) / 8.0); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 52defb9..df99dc8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -364,6 +364,7 @@ public boolean isCompressed() { private final PositionedOutputStream rowIndexStream; private boolean foundNulls; private OutStream isPresentOutStream; + protected final boolean useDirectV2Encoding; /** * Create a tree writer. @@ -379,6 +380,9 @@ public boolean isCompressed() { this.isCompressed = streamFactory.isCompressed(); this.id = columnId; this.inspector = inspector; + // make direct v2 encoding as default + // FIXME: do we need to make this user configurable? + this.useDirectV2Encoding = true; if (nullable) { isPresentOutStream = streamFactory.createStream(id, OrcProto.Stream.Kind.PRESENT); @@ -493,6 +497,10 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, * @return the information about the encoding of this column */ OrcProto.ColumnEncoding getEncoding() { + if (useDirectV2Encoding) { + return OrcProto.ColumnEncoding.newBuilder().setKind( + OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } return OrcProto.ColumnEncoding.newBuilder().setKind( OrcProto.ColumnEncoding.Kind.DIRECT).build(); } @@ -619,6 +627,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { private static class IntegerTreeWriter extends TreeWriter { private final RunLengthIntegerWriter writer; + private final IntegerCompressionWriter writerV2; private final ShortObjectInspector shortInspector; private final IntObjectInspector intInspector; private final LongObjectInspector longInspector; @@ -630,7 +639,13 @@ void recordPosition(PositionRecorder recorder) throws IOException { super(columnId, inspector, writer, nullable); PositionedOutputStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); - this.writer = new RunLengthIntegerWriter(out, true); + if (super.useDirectV2Encoding) { + this.writer = null; + this.writerV2 = new IntegerCompressionWriter(out, true); + } else { + this.writer = new RunLengthIntegerWriter(out, true); + this.writerV2 = null; + } if (inspector instanceof IntObjectInspector) { intInspector = (IntObjectInspector) inspector; shortInspector = null; @@ -661,7 +676,11 @@ void write(Object obj) throws IOException { val = shortInspector.get(obj); } indexStatistics.updateInteger(val); - writer.write(val); + if (writer != null) { + writer.write(val); + } else { + writerV2.write(val); + } } } @@ -669,14 +688,22 @@ void write(Object obj) throws IOException { void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - writer.flush(); + if (writer != null) { + writer.flush(); + } else { + writerV2.flush(); + } recordPosition(rowIndexPosition); } @Override void recordPosition(PositionRecorder recorder) throws IOException { super.recordPosition(recorder); - writer.getPosition(recorder); + if (writer != null) { + writer.getPosition(recorder); + } else { + writerV2.getPosition(recorder); + } } } @@ -761,6 +788,8 @@ void recordPosition(PositionRecorder recorder) throws IOException { private final PositionedOutputStream stringOutput; private final RunLengthIntegerWriter lengthOutput; private final RunLengthIntegerWriter rowOutput; + private final IntegerCompressionWriter lengthOutputV2; + private final IntegerCompressionWriter rowOutputV2; private final StringRedBlackTree dictionary = new StringRedBlackTree(INITIAL_DICTIONARY_SIZE); private final DynamicIntArray rows = new DynamicIntArray(); @@ -776,10 +805,21 @@ void recordPosition(PositionRecorder recorder) throws IOException { super(columnId, inspector, writer, nullable); stringOutput = writer.createStream(id, OrcProto.Stream.Kind.DICTIONARY_DATA); - lengthOutput = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.LENGTH), false); - rowOutput = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.DATA), false); + if (super.useDirectV2Encoding == true) { + lengthOutput = null; + rowOutput = null; + lengthOutputV2 = new IntegerCompressionWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false); + rowOutputV2 = new IntegerCompressionWriter(writer.createStream(id, + OrcProto.Stream.Kind.DATA), false); + } else { + lengthOutput = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false); + rowOutput = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.DATA), false); + lengthOutputV2 = null; + rowOutputV2 = null; + } recordPosition(rowIndexPosition); rowIndexValueCount.add(0L); buildIndex = writer.buildIndex(); @@ -808,7 +848,11 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, public void visit(StringRedBlackTree.VisitorContext context ) throws IOException { context.writeBytes(stringOutput); - lengthOutput.write(context.getLength()); + if (lengthOutput != null) { + lengthOutput.write(context.getLength()); + } else { + lengthOutputV2.write(context.getLength()); + } dumpOrder[context.getOriginalPosition()] = currentId++; } }); @@ -824,20 +868,37 @@ public void visit(StringRedBlackTree.VisitorContext context rowIndexEntry < savedRowIndex.size()) { OrcProto.RowIndexEntry.Builder base = savedRowIndex.get(rowIndexEntry++).toBuilder(); - rowOutput.getPosition(new RowIndexPositionRecorder(base)); + if (rowOutput != null) { + rowOutput.getPosition(new RowIndexPositionRecorder(base)); + } else { + rowOutputV2.getPosition(new RowIndexPositionRecorder(base)); + } rowIndex.addEntry(base.build()); } } if (i != length) { - rowOutput.write(dumpOrder[rows.get(i)]); + if (rowOutput != null) { + rowOutput.write(dumpOrder[rows.get(i)]); + } else { + rowOutputV2.write(dumpOrder[rows.get(i)]); + } } } // we need to build the rowindex before calling super, since it // writes it out. super.writeStripe(builder, requiredIndexEntries); stringOutput.flush(); - lengthOutput.flush(); - rowOutput.flush(); + if (lengthOutput != null) { + lengthOutput.flush(); + } else { + lengthOutputV2.flush(); + } + + if (rowOutput != null) { + rowOutput.flush(); + } else { + rowOutputV2.flush(); + } // reset all of the fields to be ready for the next stripe. dictionary.clear(); rows.clear(); @@ -882,6 +943,7 @@ long estimateMemory() { private static class BinaryTreeWriter extends TreeWriter { private final PositionedOutputStream stream; private final RunLengthIntegerWriter length; + private final IntegerCompressionWriter lengthV2; BinaryTreeWriter(int columnId, ObjectInspector inspector, @@ -890,8 +952,15 @@ long estimateMemory() { super(columnId, inspector, writer, nullable); this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); - this.length = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.LENGTH), false); + if (super.useDirectV2Encoding) { + this.lengthV2 = new IntegerCompressionWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false); + this.length = null; + } else { + this.length = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false); + this.lengthV2 = null; + } recordPosition(rowIndexPosition); } @@ -902,7 +971,11 @@ void write(Object obj) throws IOException { BytesWritable val = ((BinaryObjectInspector) inspector).getPrimitiveWritableObject(obj); stream.write(val.getBytes(), 0, val.getLength()); - length.write(val.getLength()); + if (length != null) { + length.write(val.getLength()); + } else { + lengthV2.write(val.getLength()); + } } } @@ -911,7 +984,11 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); stream.flush(); - length.flush(); + if (length != null) { + length.flush(); + } else { + lengthV2.flush(); + } recordPosition(rowIndexPosition); } @@ -919,7 +996,11 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, void recordPosition(PositionRecorder recorder) throws IOException { super.recordPosition(recorder); stream.getPosition(recorder); - length.getPosition(recorder); + if (length != null) { + length.getPosition(recorder); + } else { + lengthV2.getPosition(recorder); + } } } @@ -930,16 +1011,29 @@ void recordPosition(PositionRecorder recorder) throws IOException { private static class TimestampTreeWriter extends TreeWriter { private final RunLengthIntegerWriter seconds; private final RunLengthIntegerWriter nanos; + private final IntegerCompressionWriter secondsV2; + private final IntegerCompressionWriter nanosV2; TimestampTreeWriter(int columnId, ObjectInspector inspector, StreamFactory writer, boolean nullable) throws IOException { super(columnId, inspector, writer, nullable); - this.seconds = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.DATA), true); - this.nanos = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.SECONDARY), false); + if (super.useDirectV2Encoding) { + this.seconds = null; + this.nanos = null; + this.secondsV2 = new IntegerCompressionWriter(writer.createStream(id, + OrcProto.Stream.Kind.DATA), true); + this.nanosV2 = new IntegerCompressionWriter(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), false); + } else { + this.secondsV2 = null; + this.nanosV2 = null; + this.seconds = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.DATA), true); + this.nanos = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), false); + } recordPosition(rowIndexPosition); } @@ -950,8 +1044,17 @@ void write(Object obj) throws IOException { Timestamp val = ((TimestampObjectInspector) inspector). getPrimitiveJavaObject(obj); - seconds.write((val.getTime() / MILLIS_PER_SECOND) - BASE_TIMESTAMP); - nanos.write(formatNanos(val.getNanos())); + if (seconds != null) { + seconds.write((val.getTime() / MILLIS_PER_SECOND) - BASE_TIMESTAMP); + } else { + secondsV2.write((val.getTime() / MILLIS_PER_SECOND) - BASE_TIMESTAMP); + } + + if (nanos != null) { + nanos.write(formatNanos(val.getNanos())); + } else { + nanosV2.write(formatNanos(val.getNanos())); + } } } @@ -959,8 +1062,17 @@ void write(Object obj) throws IOException { void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - seconds.flush(); - nanos.flush(); + if (seconds != null) { + seconds.flush(); + } else { + secondsV2.flush(); + } + + if (nanos != null) { + nanos.flush(); + } else { + nanosV2.flush(); + } recordPosition(rowIndexPosition); } @@ -983,14 +1095,23 @@ private static long formatNanos(int nanos) { @Override void recordPosition(PositionRecorder recorder) throws IOException { super.recordPosition(recorder); - seconds.getPosition(recorder); - nanos.getPosition(recorder); + if (seconds != null) { + seconds.getPosition(recorder); + } else { + secondsV2.getPosition(recorder); + } + if (nanos != null) { + nanos.getPosition(recorder); + } else { + nanosV2.getPosition(recorder); + } } } private static class DecimalTreeWriter extends TreeWriter { private final PositionedOutputStream valueStream; private final RunLengthIntegerWriter scaleStream; + private final IntegerCompressionWriter scaleStreamV2; DecimalTreeWriter(int columnId, ObjectInspector inspector, @@ -998,8 +1119,15 @@ void recordPosition(PositionRecorder recorder) throws IOException { boolean nullable) throws IOException { super(columnId, inspector, writer, nullable); valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA); - scaleStream = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.SECONDARY), true); + if (super.useDirectV2Encoding) { + this.scaleStream = null; + this.scaleStreamV2 = new IntegerCompressionWriter(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), true); + } else { + this.scaleStreamV2 = null; + scaleStream = new RunLengthIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), true); + } recordPosition(rowIndexPosition); } @@ -1011,7 +1139,11 @@ void write(Object obj) throws IOException { getPrimitiveJavaObject(obj); SerializationUtils.writeBigInteger(valueStream, decimal.unscaledValue()); - scaleStream.write(decimal.scale()); + if (scaleStream != null) { + scaleStream.write(decimal.scale()); + } else { + scaleStreamV2.write(decimal.scale()); + } indexStatistics.updateDecimal(decimal); } } @@ -1021,7 +1153,11 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); valueStream.flush(); - scaleStream.flush(); + if (scaleStream != null) { + scaleStream.flush(); + } else { + scaleStreamV2.flush(); + } recordPosition(rowIndexPosition); } @@ -1029,7 +1165,11 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, void recordPosition(PositionRecorder recorder) throws IOException { super.recordPosition(recorder); valueStream.getPosition(recorder); - scaleStream.getPosition(recorder); + if (scaleStream != null) { + scaleStream.getPosition(recorder); + } else { + scaleStreamV2.getPosition(recorder); + } } } @@ -1077,6 +1217,7 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, private static class ListTreeWriter extends TreeWriter { private final RunLengthIntegerWriter lengths; + private final IntegerCompressionWriter lengthsV2; ListTreeWriter(int columnId, ObjectInspector inspector, @@ -1088,9 +1229,15 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, childrenWriters[0] = createTreeWriter(listObjectInspector.getListElementObjectInspector(), writer, true); - lengths = - new RunLengthIntegerWriter(writer.createStream(columnId, + if (super.useDirectV2Encoding) { + lengths = null; + lengthsV2 = new IntegerCompressionWriter(writer.createStream(columnId, OrcProto.Stream.Kind.LENGTH), false); + } else { + lengthsV2 = null; + lengths = new RunLengthIntegerWriter(writer.createStream(columnId, + OrcProto.Stream.Kind.LENGTH), false); + } recordPosition(rowIndexPosition); } @@ -1100,7 +1247,11 @@ void write(Object obj) throws IOException { if (obj != null) { ListObjectInspector insp = (ListObjectInspector) inspector; int len = insp.getListLength(obj); - lengths.write(len); + if (lengths != null) { + lengths.write(len); + } else { + lengthsV2.write(len); + } for(int i=0; i < len; ++i) { childrenWriters[0].write(insp.getListElement(obj, i)); } @@ -1111,7 +1262,11 @@ void write(Object obj) throws IOException { void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - lengths.flush(); + if (lengths != null) { + lengths.flush(); + } else { + lengthsV2.flush(); + } for(TreeWriter child: childrenWriters) { child.writeStripe(builder, requiredIndexEntries); } @@ -1121,12 +1276,17 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, @Override void recordPosition(PositionRecorder recorder) throws IOException { super.recordPosition(recorder); - lengths.getPosition(recorder); + if (lengths != null) { + lengths.getPosition(recorder); + } else { + lengthsV2.getPosition(recorder); + } } } private static class MapTreeWriter extends TreeWriter { private final RunLengthIntegerWriter lengths; + private final IntegerCompressionWriter lengthsV2; MapTreeWriter(int columnId, ObjectInspector inspector, @@ -1139,9 +1299,17 @@ void recordPosition(PositionRecorder recorder) throws IOException { createTreeWriter(insp.getMapKeyObjectInspector(), writer, true); childrenWriters[1] = createTreeWriter(insp.getMapValueObjectInspector(), writer, true); - lengths = - new RunLengthIntegerWriter(writer.createStream(columnId, - OrcProto.Stream.Kind.LENGTH), false); + if (super.useDirectV2Encoding) { + lengths = null; + lengthsV2 = + new IntegerCompressionWriter(writer.createStream(columnId, + OrcProto.Stream.Kind.LENGTH), false); + } else { + lengthsV2 = null; + lengths = + new RunLengthIntegerWriter(writer.createStream(columnId, + OrcProto.Stream.Kind.LENGTH), false); + } recordPosition(rowIndexPosition); } @@ -1151,7 +1319,11 @@ void write(Object obj) throws IOException { if (obj != null) { MapObjectInspector insp = (MapObjectInspector) inspector; int len = insp.getMapSize(obj); - lengths.write(len); + if (lengths != null) { + lengths.write(len); + } else { + lengthsV2.write(len); + } // this sucks, but it will have to do until we can get a better // accessor in the MapObjectInspector. Map valueMap = insp.getMap(obj); @@ -1166,7 +1338,11 @@ void write(Object obj) throws IOException { void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - lengths.flush(); + if (lengths != null) { + lengths.flush(); + } else { + lengthsV2.flush(); + } for(TreeWriter child: childrenWriters) { child.writeStripe(builder, requiredIndexEntries); } @@ -1176,7 +1352,11 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, @Override void recordPosition(PositionRecorder recorder) throws IOException { super.recordPosition(recorder); - lengths.getPosition(recorder); + if (lengths != null) { + lengths.getPosition(recorder); + } else { + lengthsV2.getPosition(recorder); + } } } diff --git ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto index d19bc06..994be70 100644 --- ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto +++ ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto @@ -66,6 +66,7 @@ message ColumnEncoding { enum Kind { DIRECT = 0; DICTIONARY = 1; + DIRECT_V2 = 2; } required Kind kind = 1; optional uint32 dictionarySize = 2; diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java new file mode 100644 index 0000000..99f7d9a --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.util.Collections; +import java.util.Random; + +import org.junit.Test; + +import com.google.common.primitives.Longs; + +public class TestBitPack { + + private static final int SIZE = 100; + private static Random rand = new Random(100); + + private long[] deltaEncode(long[] inp) { + long[] output = new long[inp.length]; + for (int i = 0; i < inp.length; i++) { + output[i] = Utils.zigzagEncode(inp[i]); + } + return output; + } + + private void runTest(int numBits) { + long[] inp = new long[SIZE]; + for (int i = 0; i < SIZE; i++) { + long val = 0; + if (numBits <= 32) { + val = rand.nextInt((int) Math.pow(2, numBits - 1)); + } else { + val = Utils.nextLong(rand, (long) Math.pow(2, numBits - 2)); + } + if (val % 2 == 0) { + val = -val; + } + inp[i] = val; + } + long[] deltaEncoded = deltaEncode(inp); + long minInput = Collections.min(Longs.asList(deltaEncoded)); + long maxInput = Collections.max(Longs.asList(deltaEncoded)); + long rangeInput = maxInput - minInput; + int fixedWidth = Utils.findNumBits(rangeInput); + fixedWidth = FixedBitSizes.getClosestFixedBits(fixedWidth); + BitPackWriter bpw = new BitPackWriter(fixedWidth); + byte[] packed = bpw.pack(deltaEncoded, deltaEncoded.length); + + BitPackReader bpr = new BitPackReader(fixedWidth); + long[] deltaDec = bpr.unpack(packed, deltaEncoded.length); + long[] out = new long[deltaDec.length]; + for (int i = 0; i < deltaDec.length; i++) { + out[i] = Utils.zigzagDecode(deltaDec[i]); + } + assertEquals(numBits, fixedWidth); + assertArrayEquals(inp, out); + } + + @Test + public void test01BitPacking1Bit() { + long[] inp = new long[SIZE]; + for (int i = 0; i < SIZE; i++) { + inp[i] = -1 * rand.nextInt(2); + } + long[] deltaEncoded = deltaEncode(inp); + long minInput = Collections.min(Longs.asList(deltaEncoded)); + long maxInput = Collections.max(Longs.asList(deltaEncoded)); + long rangeInput = maxInput - minInput; + int fixedWidth = Utils.findNumBits(rangeInput); + fixedWidth = FixedBitSizes.getClosestFixedBits(fixedWidth); + BitPackWriter bpw = new BitPackWriter(fixedWidth); + byte[] packed = bpw.pack(deltaEncoded, deltaEncoded.length); + BitPackReader bpr = new BitPackReader(fixedWidth); + long[] deltaDec = bpr.unpack(packed, SIZE); + long[] out = new long[deltaDec.length]; + for (int i = 0; i < deltaDec.length; i++) { + out[i] = Utils.zigzagDecode(deltaDec[i]); + } + assertEquals(1, fixedWidth); + assertArrayEquals(inp, out); + } + + @Test + public void test02BitPacking2Bit() { + runTest(2); + } + + @Test + public void test03BitPacking3Bit() { + runTest(3); + } + + @Test + public void test04BitPacking4Bit() { + runTest(4); + } + + @Test + public void test05BitPacking5Bit() { + runTest(5); + } + + @Test + public void test06BitPacking6Bit() { + runTest(6); + } + + @Test + public void test07BitPacking7Bit() { + runTest(7); + } + + @Test + public void test08BitPacking8Bit() { + runTest(8); + } + + @Test + public void test09BitPacking9Bit() { + runTest(9); + } + + @Test + public void test10BitPacking10Bit() { + runTest(10); + } + + @Test + public void test11BitPacking11Bit() { + runTest(11); + } + + @Test + public void test12BitPacking12Bit() { + runTest(12); + } + + @Test + public void test13BitPacking13Bit() { + runTest(13); + } + + @Test + public void test14BitPacking14Bit() { + runTest(14); + } + + @Test + public void test15BitPacking15Bit() { + runTest(15); + } + + @Test + public void test16BitPacking16Bit() { + runTest(16); + } + + @Test + public void test17BitPacking17Bit() { + runTest(17); + } + + @Test + public void test18BitPacking18Bit() { + runTest(18); + } + + @Test + public void test19BitPacking19Bit() { + runTest(19); + } + + @Test + public void test20BitPacking20Bit() { + runTest(20); + } + + @Test + public void test21BitPacking21Bit() { + runTest(21); + } + + @Test + public void test22BitPacking22Bit() { + runTest(22); + } + + @Test + public void test23BitPacking23Bit() { + runTest(23); + } + + @Test + public void test24BitPacking24Bit() { + runTest(24); + } + + @Test + public void test26BitPacking26Bit() { + runTest(26); + } + + @Test + public void test28BitPacking28Bit() { + runTest(28); + } + + @Test + public void test30BitPacking30Bit() { + runTest(30); + } + + @Test + public void test32BitPacking32Bit() { + runTest(32); + } + + @Test + public void test40BitPacking40Bit() { + runTest(40); + } + + @Test + public void test48BitPacking48Bit() { + runTest(48); + } + + @Test + public void test56BitPacking56Bit() { + runTest(56); + } + + @Test + public void test64BitPacking64Bit() { + runTest(64); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java new file mode 100644 index 0000000..762e3d0 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import static junit.framework.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.util.Random; + +import org.junit.Test; + +public class TestIntegerCompressionReader { + + public void runSeekTest(CompressionCodec codec) throws Exception { + TestInStream.OutputCollector collect = new TestInStream.OutputCollector(); + IntegerCompressionWriter out = new IntegerCompressionWriter( + new OutStream("test", 1000, codec, collect), true); + TestInStream.PositionCollector[] positions = + new TestInStream.PositionCollector[4096]; + Random random = new Random(99); + int[] junk = new int[2048]; + for(int i=0; i < junk.length; ++i) { + junk[i] = random.nextInt(); + } + for(int i=0; i < 4096; ++i) { + positions[i] = new TestInStream.PositionCollector(); + out.getPosition(positions[i]); + // test runs, incrementing runs, non-runs + if (i < 1024) { + out.write(i/4); + } else if (i < 2048) { + out.write(2*i); + } else { + out.write(junk[i-2048]); + } + } + out.flush(); + ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); + collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); + inBuf.flip(); + IntegerCompressionReader in = new IntegerCompressionReader(InStream.create + ("test", inBuf, codec, 1000), true); + for(int i=0; i < 2048; ++i) { + int x = (int) in.next(); + if (i < 1024) { + assertEquals(i/4, x); + } else if (i < 2048) { + assertEquals(2*i, x); + } else { + assertEquals(junk[i-2048], x); + } + } + for(int i=2047; i >= 0; --i) { + in.seek(positions[i]); + int x = (int) in.next(); + if (i < 1024) { + assertEquals(i/4, x); + } else if (i < 2048) { + assertEquals(2*i, x); + } else { + assertEquals(junk[i-2048], x); + } + } + } + + @Test + public void testUncompressedSeek() throws Exception { + runSeekTest(null); + } + + @Test + public void testCompressedSeek() throws Exception { + runSeekTest(new ZlibCodec()); + } + + @Test + public void testSkips() throws Exception { + TestInStream.OutputCollector collect = new TestInStream.OutputCollector(); + IntegerCompressionWriter out = new IntegerCompressionWriter( + new OutStream("test", 100, null, collect), true); + for(int i=0; i < 2048; ++i) { + if (i < 1024) { + out.write(i); + } else { + out.write(256 * i); + } + } + out.flush(); + ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); + collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); + inBuf.flip(); + IntegerCompressionReader in = new IntegerCompressionReader(InStream.create + ("test", inBuf, null, 100), true); + for(int i=0; i < 2048; i += 10) { + int x = (int) in.next(); + if (i < 1024) { + assertEquals(i, x); + } else { + assertEquals(256 * i, x); + } + if (i < 2038) { + in.skip(9); + } + in.skip(0); + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java new file mode 100644 index 0000000..9cc4be6 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java @@ -0,0 +1,542 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import static junit.framework.Assert.assertEquals; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; + +public class TestNewIntegerEncoding { + + public static class Row { + Integer int1; + Long long1; + + public Row(int val, long l) { + this.int1 = val; + this.long1 = l; + } + } + + public List fetchData(String path) throws IOException { + List input = new ArrayList(); + FileInputStream stream = new FileInputStream(new File(path)); + try { + FileChannel fc = stream.getChannel(); + MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size()); + /* Instead of using default, pass in a decoder. */ + String[] lines = Charset.defaultCharset().decode(bb).toString() + .split("\n"); + for (String line : lines) { + long val = 0; + try { + val = Long.parseLong(line); + } catch (NumberFormatException e) { + // for now lets ignore (assign 0) + } + input.add(val); + } + } finally { + stream.close(); + } + return input; + } + + Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + + File.separator + "test" + File.separator + "tmp")); + + Configuration conf; + FileSystem fs; + Path testFilePath; + String resDir = "src/test/resources"; + + @Rule + public TestName testCaseName = new TestName(); + + @Before + public void openFileSystem() throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + testFilePath = new Path(workDir, "TestOrcFile." + + testCaseName.getMethodName() + ".orc"); + fs.delete(testFilePath, false); + } + + @Test + public void testBasicRow() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Row.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + writer.addRow(new Row(111, 1111L)); + writer.addRow(new Row(111, 1111L)); + writer.addRow(new Row(111, 1111L)); + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(new IntWritable(111), ((OrcStruct) row).getFieldValue(0)); + assertEquals(new LongWritable(1111), ((OrcStruct) row).getFieldValue(1)); + } + } + + @Test + public void testBasic() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5, 6, + 7, 8, 9, 10, 1, 1, 1, 1, 1, 1, 10, 9, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, + 2, 5, 1, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, + 9, 2, 6, 3, 7, 1, 9, 2, 6, 2000, 2, 1, 1, 1, 1, 1, 3, 7, 1, 9, 2, 6, 1, + 1, 1, 1, 1 }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testIntegerMin() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add((long) Integer.MIN_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.ZLIB, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testIntegerMax() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add((long) Integer.MAX_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testLongMin() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add(Long.MIN_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testLongMax() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add(Long.MAX_VALUE); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testRandomInt() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 100000; i++) { + input.add((long) rand.nextInt()); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testRandomLong() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 100000; i++) { + input.add(rand.nextLong()); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt0() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(0, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt1() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(1, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt255() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(255, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.ZLIB, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseAt256() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(256, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.ZLIB, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBase510() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(510, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.ZLIB, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBase511() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(100)); + } + input.set(511, 20000L); + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.ZLIB, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testSeek() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector( + Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 100000; i++) { + input.add((long) rand.nextInt()); + } + + Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, + CompressionKind.NONE, 10000, 10000); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 55555; + rows.seekToRow(idx); + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java index 9f989fd..2f7a7f1 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.ql.io.orc; import static junit.framework.Assert.assertEquals; diff --git ql/src/test/resources/orc-file-dump.out ql/src/test/resources/orc-file-dump.out index 8b88931..50e8e91 100644 --- ql/src/test/resources/orc-file-dump.out +++ ql/src/test/resources/orc-file-dump.out @@ -11,73 +11,73 @@ Statistics: Column 3: count: 21000 min: Darkness, max: worst Stripes: - Stripe: offset: 3 data: 69605 rows: 5000 tail: 72 index: 119 + Stripe: offset: 3 data: 53140 rows: 4163 tail: 72 index: 119 Stream: column 0 section ROW_INDEX start: 3 length 10 Stream: column 1 section ROW_INDEX start: 13 length 35 Stream: column 2 section ROW_INDEX start: 48 length 39 Stream: column 3 section ROW_INDEX start: 87 length 35 - Stream: column 1 section DATA start: 122 length 22605 - Stream: column 2 section DATA start: 22727 length 43426 - Stream: column 3 section DATA start: 66153 length 3403 - Stream: column 3 section LENGTH start: 69556 length 38 - Stream: column 3 section DICTIONARY_DATA start: 69594 length 133 - Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT + Stream: column 1 section DATA start: 122 length 16676 + Stream: column 2 section DATA start: 16798 length 33334 + Stream: column 3 section DATA start: 50132 length 2972 + Stream: column 3 section LENGTH start: 53104 length 25 + Stream: column 3 section DICTIONARY_DATA start: 53129 length 133 + Encoding column 0: DIRECT_V2 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 Encoding column 3: DICTIONARY[35] - Stripe: offset: 69799 data: 69584 rows: 5000 tail: 73 index: 118 - Stream: column 0 section ROW_INDEX start: 69799 length 10 - Stream: column 1 section ROW_INDEX start: 69809 length 34 - Stream: column 2 section ROW_INDEX start: 69843 length 39 - Stream: column 3 section ROW_INDEX start: 69882 length 35 - Stream: column 1 section DATA start: 69917 length 22597 - Stream: column 2 section DATA start: 92514 length 43439 - Stream: column 3 section DATA start: 135953 length 3377 - Stream: column 3 section LENGTH start: 139330 length 38 - Stream: column 3 section DICTIONARY_DATA start: 139368 length 133 - Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT + Stripe: offset: 53334 data: 63747 rows: 5000 tail: 73 index: 120 + Stream: column 0 section ROW_INDEX start: 53334 length 10 + Stream: column 1 section ROW_INDEX start: 53344 length 36 + Stream: column 2 section ROW_INDEX start: 53380 length 39 + Stream: column 3 section ROW_INDEX start: 53419 length 35 + Stream: column 1 section DATA start: 53454 length 20029 + Stream: column 2 section DATA start: 73483 length 40035 + Stream: column 3 section DATA start: 113518 length 3525 + Stream: column 3 section LENGTH start: 117043 length 25 + Stream: column 3 section DICTIONARY_DATA start: 117068 length 133 + Encoding column 0: DIRECT_V2 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 Encoding column 3: DICTIONARY[35] - Stripe: offset: 139574 data: 69570 rows: 5000 tail: 73 index: 120 - Stream: column 0 section ROW_INDEX start: 139574 length 10 - Stream: column 1 section ROW_INDEX start: 139584 length 36 - Stream: column 2 section ROW_INDEX start: 139620 length 39 - Stream: column 3 section ROW_INDEX start: 139659 length 35 - Stream: column 1 section DATA start: 139694 length 22594 - Stream: column 2 section DATA start: 162288 length 43415 - Stream: column 3 section DATA start: 205703 length 3390 - Stream: column 3 section LENGTH start: 209093 length 38 - Stream: column 3 section DICTIONARY_DATA start: 209131 length 133 - Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT + Stripe: offset: 117274 data: 63767 rows: 5000 tail: 73 index: 120 + Stream: column 0 section ROW_INDEX start: 117274 length 10 + Stream: column 1 section ROW_INDEX start: 117284 length 36 + Stream: column 2 section ROW_INDEX start: 117320 length 39 + Stream: column 3 section ROW_INDEX start: 117359 length 35 + Stream: column 1 section DATA start: 117394 length 20029 + Stream: column 2 section DATA start: 137423 length 40035 + Stream: column 3 section DATA start: 177458 length 3545 + Stream: column 3 section LENGTH start: 181003 length 25 + Stream: column 3 section DICTIONARY_DATA start: 181028 length 133 + Encoding column 0: DIRECT_V2 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 Encoding column 3: DICTIONARY[35] - Stripe: offset: 209337 data: 69551 rows: 5000 tail: 72 index: 119 - Stream: column 0 section ROW_INDEX start: 209337 length 10 - Stream: column 1 section ROW_INDEX start: 209347 length 35 - Stream: column 2 section ROW_INDEX start: 209382 length 39 - Stream: column 3 section ROW_INDEX start: 209421 length 35 - Stream: column 1 section DATA start: 209456 length 22575 - Stream: column 2 section DATA start: 232031 length 43426 - Stream: column 3 section DATA start: 275457 length 3379 - Stream: column 3 section LENGTH start: 278836 length 38 - Stream: column 3 section DICTIONARY_DATA start: 278874 length 133 - Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT - Encoding column 3: DICTIONARY[35] - Stripe: offset: 279079 data: 14096 rows: 1000 tail: 68 index: 120 - Stream: column 0 section ROW_INDEX start: 279079 length 10 - Stream: column 1 section ROW_INDEX start: 279089 length 36 - Stream: column 2 section ROW_INDEX start: 279125 length 39 - Stream: column 3 section ROW_INDEX start: 279164 length 35 - Stream: column 1 section DATA start: 279199 length 4529 - Stream: column 2 section DATA start: 283728 length 8690 - Stream: column 3 section DATA start: 292418 length 706 - Stream: column 3 section LENGTH start: 293124 length 38 - Stream: column 3 section DICTIONARY_DATA start: 293162 length 133 - Encoding column 0: DIRECT - Encoding column 1: DIRECT - Encoding column 2: DIRECT + Stripe: offset: 181234 data: 63786 rows: 5000 tail: 73 index: 120 + Stream: column 0 section ROW_INDEX start: 181234 length 10 + Stream: column 1 section ROW_INDEX start: 181244 length 36 + Stream: column 2 section ROW_INDEX start: 181280 length 39 + Stream: column 3 section ROW_INDEX start: 181319 length 35 + Stream: column 1 section DATA start: 181354 length 20029 + Stream: column 2 section DATA start: 201383 length 40035 + Stream: column 3 section DATA start: 241418 length 3564 + Stream: column 3 section LENGTH start: 244982 length 25 + Stream: column 3 section DICTIONARY_DATA start: 245007 length 133 + Encoding column 0: DIRECT_V2 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 Encoding column 3: DICTIONARY[35] + Stripe: offset: 245213 data: 23583 rows: 1837 tail: 68 index: 120 + Stream: column 0 section ROW_INDEX start: 245213 length 10 + Stream: column 1 section ROW_INDEX start: 245223 length 36 + Stream: column 2 section ROW_INDEX start: 245259 length 39 + Stream: column 3 section ROW_INDEX start: 245298 length 35 + Stream: column 1 section DATA start: 245333 length 7359 + Stream: column 2 section DATA start: 252692 length 14710 + Stream: column 3 section DATA start: 267402 length 1356 + Stream: column 3 section LENGTH start: 268758 length 25 + Stream: column 3 section DICTIONARY_DATA start: 268783 length 133 + Encoding column 0: DIRECT_V2 + Encoding column 1: DIRECT_V2 + Encoding column 2: DIRECT_V2 + Encoding column 3: DICTIONARY[35] \ No newline at end of file