diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d26573e..32ec10d 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -552,6 +552,9 @@ true), // Define the default compression codec for ORC file HIVE_ORC_DEFAULT_COMPRESS("hive.exec.orc.default.compress", "ZLIB"), + // Define the default encoding strategy to use + HIVE_ORC_ENCODING_STRATEGY("hive.exec.orc.encoding.strategy", "BEST_SPEED", + new StringsValidator("BEST_SPEED", "BEST_COMPRESSION")), HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false), HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000), HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10), diff --git conf/hive-default.xml.template conf/hive-default.xml.template index 8a74e4e..b508181 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -1970,6 +1970,17 @@ + hive.exec.orc.encoding.strategy + BEST_SPEED + + Define the encoding strategy to use while writing data. Changing this will + only affect the light weight encoding for integers. This flag will not + change the compression level of higher level compression codec (like ZLIB). + Possible options are BEST_SPEED and BEST_COMPRESSION. + + + + hive.exec.orc.dictionary.key.size.threshold 0.8 diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index 7c542c1..8519fb8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -95,6 +95,9 @@ public int getMinor() { } } + public static enum EncodingStrategy { + BEST_SPEED, BEST_COMPRESSION; + } // Note : these string definitions for table properties are deprecated, // and retained only for backward compatibility, please do not add to @@ -117,7 +120,8 @@ public int getMinor() { STRIPE_SIZE("orc.stripe.size"), ROW_INDEX_STRIDE("orc.row.index.stride"), ENABLE_INDEXES("orc.create.index"), - BLOCK_PADDING("orc.block.padding"); + BLOCK_PADDING("orc.block.padding"), + ENCODING_STRATEGY("orc.encoding.strategy"); private final String propName; @@ -221,6 +225,7 @@ public static Reader createReader(Path path, private MemoryManager memoryManagerValue; private Version versionValue; private WriterCallback callback; + private EncodingStrategy encodingStrategy; WriterOptions(Configuration conf) { configuration = conf; @@ -250,6 +255,13 @@ public static Reader createReader(Path path, } else { versionValue = Version.byName(versionName); } + String enString = + conf.get(HiveConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname); + if (enString == null) { + encodingStrategy = EncodingStrategy.BEST_SPEED; + } else { + encodingStrategy = EncodingStrategy.valueOf(enString); + } } /** @@ -301,6 +313,14 @@ public WriterOptions blockPadding(boolean value) { } /** + * Sets the encoding strategy that is used to encode the data. + */ + public WriterOptions encodingStrategy(EncodingStrategy strategy) { + encodingStrategy = strategy; + return this; + } + + /** * Sets the generic compression that is used to compress the data. */ public WriterOptions compress(CompressionKind value) { @@ -370,7 +390,7 @@ public static Writer createWriter(Path path, opts.stripeSizeValue, opts.compressValue, opts.bufferSizeValue, opts.rowIndexStrideValue, opts.memoryManagerValue, opts.blockPaddingValue, - opts.versionValue, opts.callback); + opts.versionValue, opts.callback, opts.encodingStrategy); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index 578d923..00e0807 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy; import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -164,6 +165,11 @@ private String getSettingFromPropsFallingBackToConf(String key, Properties props options.blockPadding(Boolean.parseBoolean(propVal)); } + if ((propVal = getSettingFromPropsFallingBackToConf( + OrcFile.OrcTableProperties.ENCODING_STRATEGY.getPropName(),props,conf)) != null){ + options.encodingStrategy(EncodingStrategy.valueOf(propVal)); + } + return options; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index e033232..4ec07cb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.math.BigInteger; import java.nio.ByteBuffer; -import java.sql.Date; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; @@ -746,9 +745,11 @@ void skipRows(long items) throws IOException { private static class FloatTreeReader extends TreeReader{ private InStream stream; + private final SerializationUtils utils; FloatTreeReader(Path path, int columnId, Configuration conf) { super(path, columnId, conf); + this.utils = new SerializationUtils(); } @Override @@ -777,7 +778,7 @@ Object next(Object previous) throws IOException { } else { result = (FloatWritable) previous; } - result.set(SerializationUtils.readFloat(stream)); + result.set(utils.readFloat(stream)); } return result; } @@ -797,7 +798,7 @@ Object nextVector(Object previousVector, long batchSize) throws IOException { // Read value entries based on isNull entries for (int i = 0; i < batchSize; i++) { if (!result.isNull[i]) { - result.vector[i] = SerializationUtils.readFloat(stream); + result.vector[i] = utils.readFloat(stream); } else { // If the value is not present then set NaN @@ -819,16 +820,18 @@ Object nextVector(Object previousVector, long batchSize) throws IOException { void skipRows(long items) throws IOException { items = countNonNulls(items); for(int i=0; i < items; ++i) { - SerializationUtils.readFloat(stream); + utils.readFloat(stream); } } } private static class DoubleTreeReader extends TreeReader{ private InStream stream; + private final SerializationUtils utils; DoubleTreeReader(Path path, int columnId, Configuration conf) { super(path, columnId, conf); + this.utils = new SerializationUtils(); } @Override @@ -858,7 +861,7 @@ Object next(Object previous) throws IOException { } else { result = (DoubleWritable) previous; } - result.set(SerializationUtils.readDouble(stream)); + result.set(utils.readDouble(stream)); } return result; } @@ -878,7 +881,7 @@ Object nextVector(Object previousVector, long batchSize) throws IOException { // Read value entries based on isNull entries for (int i = 0; i < batchSize; i++) { if (!result.isNull[i]) { - result.vector[i] = SerializationUtils.readDouble(stream); + result.vector[i] = utils.readDouble(stream); } else { // If the value is not present then set NaN result.vector[i] = Double.NaN; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java index 1756c3c..9642971 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java @@ -34,10 +34,12 @@ private int delta = 0; private int used = 0; private boolean repeat = false; + private SerializationUtils utils; RunLengthIntegerReader(InStream input, boolean signed) throws IOException { this.input = input; this.signed = signed; + this.utils = new SerializationUtils(); } private void readValues() throws IOException { @@ -55,9 +57,9 @@ private void readValues() throws IOException { // convert from 0 to 255 to -128 to 127 by converting to a signed byte delta = (byte) (0 + delta); if (signed) { - literals[0] = SerializationUtils.readVslong(input); + literals[0] = utils.readVslong(input); } else { - literals[0] = SerializationUtils.readVulong(input); + literals[0] = utils.readVulong(input); } } else { repeat = false; @@ -65,9 +67,9 @@ private void readValues() throws IOException { used = 0; for(int i=0; i < numLiterals; ++i) { if (signed) { - literals[i] = SerializationUtils.readVslong(input); + literals[i] = utils.readVslong(input); } else { - literals[i] = SerializationUtils.readVulong(input); + literals[i] = utils.readVulong(input); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java index a7e66a8..4057036 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java @@ -39,12 +39,14 @@ private int numLiterals = 0; private int used = 0; private final boolean skipCorrupt; + private final SerializationUtils utils; RunLengthIntegerReaderV2(InStream input, boolean signed, Configuration conf) throws IOException { this.input = input; this.signed = signed; this.skipCorrupt = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA); + this.utils = new SerializationUtils(); } private void readValues() throws IOException { @@ -71,7 +73,7 @@ private void readDeltaValues(int firstByte) throws IOException { // extract the number of fixed bits int fb = (firstByte >>> 1) & 0x1f; if (fb != 0) { - fb = SerializationUtils.decodeBitWidth(fb); + fb = utils.decodeBitWidth(fb); } // extract the blob run length @@ -81,9 +83,9 @@ private void readDeltaValues(int firstByte) throws IOException { // read the first value stored as vint long firstVal = 0; if (signed) { - firstVal = SerializationUtils.readVslong(input); + firstVal = utils.readVslong(input); } else { - firstVal = SerializationUtils.readVulong(input); + firstVal = utils.readVulong(input); } // store first value to result buffer @@ -94,14 +96,14 @@ private void readDeltaValues(int firstByte) throws IOException { if (fb == 0) { // read the fixed delta value stored as vint (deltas can be negative even // if all number are positive) - long fd = SerializationUtils.readVslong(input); + long fd = utils.readVslong(input); // add fixed deltas to adjacent values for(int i = 0; i < len; i++) { literals[numLiterals++] = literals[numLiterals - 2] + fd; } } else { - long deltaBase = SerializationUtils.readVslong(input); + long deltaBase = utils.readVslong(input); // add delta base and first value literals[numLiterals++] = firstVal + deltaBase; prevVal = literals[numLiterals - 1]; @@ -110,7 +112,7 @@ private void readDeltaValues(int firstByte) throws IOException { // write the unpacked values, add it to previous value and store final // value to result buffer. if the delta base value is negative then it // is a decreasing sequence else an increasing sequence - SerializationUtils.readInts(literals, numLiterals, len, fb, input); + utils.readInts(literals, numLiterals, len, fb, input); while (len > 0) { if (deltaBase < 0) { literals[numLiterals] = prevVal - literals[numLiterals]; @@ -128,7 +130,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException { // extract the number of fixed bits int fbo = (firstByte >>> 1) & 0x1f; - int fb = SerializationUtils.decodeBitWidth(fbo); + int fb = utils.decodeBitWidth(fbo); // extract the run length of data blob int len = (firstByte & 0x01) << 8; @@ -144,7 +146,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException { // extract patch width int pwo = thirdByte & 0x1f; - int pw = SerializationUtils.decodeBitWidth(pwo); + int pw = utils.decodeBitWidth(pwo); // read fourth byte and extract patch gap width int fourthByte = input.read(); @@ -156,7 +158,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException { int pl = fourthByte & 0x1f; // read the next base width number of bytes to extract base value - long base = SerializationUtils.bytesToLongBE(input, bw); + long base = utils.bytesToLongBE(input, bw); long mask = (1L << ((bw * 8) - 1)); // if MSB of base value is 1 then base is negative value else positive if ((base & mask) != 0) { @@ -166,7 +168,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException { // unpack the data blob long[] unpacked = new long[len]; - SerializationUtils.readInts(unpacked, 0, len, fb, input); + utils.readInts(unpacked, 0, len, fb, input); // unpack the patch blob long[] unpackedPatch = new long[pl]; @@ -174,8 +176,8 @@ private void readPatchedBaseValues(int firstByte) throws IOException { if ((pw + pgw) > 64 && !skipCorrupt) { throw new IOException(ErrorMsg.ORC_CORRUPTED_READ.getMsg()); } - int bitSize = SerializationUtils.getClosestFixedBits(pw + pgw); - SerializationUtils.readInts(unpackedPatch, 0, pl, bitSize, input); + int bitSize = utils.getClosestFixedBits(pw + pgw); + utils.readInts(unpackedPatch, 0, pl, bitSize, input); // apply the patch directly when decoding the packed data int patchIdx = 0; @@ -241,7 +243,7 @@ private void readDirectValues(int firstByte) throws IOException { // extract the number of fixed bits int fbo = (firstByte >>> 1) & 0x1f; - int fb = SerializationUtils.decodeBitWidth(fbo); + int fb = utils.decodeBitWidth(fbo); // extract the run length int len = (firstByte & 0x01) << 8; @@ -250,11 +252,10 @@ private void readDirectValues(int firstByte) throws IOException { len += 1; // write the unpacked values and zigzag decode to result buffer - SerializationUtils.readInts(literals, numLiterals, len, fb, input); + utils.readInts(literals, numLiterals, len, fb, input); if (signed) { for(int i = 0; i < len; i++) { - literals[numLiterals] = SerializationUtils - .zigzagDecode(literals[numLiterals]); + literals[numLiterals] = utils.zigzagDecode(literals[numLiterals]); numLiterals++; } } else { @@ -275,10 +276,10 @@ private void readShortRepeatValues(int firstByte) throws IOException { len += RunLengthIntegerWriterV2.MIN_REPEAT; // read the repeated value which is store using fixed bytes - long val = SerializationUtils.bytesToLongBE(input, size); + long val = utils.bytesToLongBE(input, size); if (signed) { - val = SerializationUtils.zigzagDecode(val); + val = utils.zigzagDecode(val); } // repeat the value for length times diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java index 539f8df..078eae8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java @@ -38,11 +38,13 @@ private long delta = 0; private boolean repeat = false; private int tailRunLength = 0; + private SerializationUtils utils; RunLengthIntegerWriter(PositionedOutputStream output, boolean signed) { this.output = output; this.signed = signed; + this.utils = new SerializationUtils(); } private void writeValues() throws IOException { @@ -51,17 +53,17 @@ private void writeValues() throws IOException { output.write(numLiterals - MIN_REPEAT_SIZE); output.write((byte) delta); if (signed) { - SerializationUtils.writeVslong(output, literals[0]); + utils.writeVslong(output, literals[0]); } else { - SerializationUtils.writeVulong(output, literals[0]); + utils.writeVulong(output, literals[0]); } } else { output.write(-numLiterals); for(int i=0; i < numLiterals; ++i) { if (signed) { - SerializationUtils.writeVslong(output, literals[i]); + utils.writeVslong(output, literals[i]); } else { - SerializationUtils.writeVulong(output, literals[i]); + utils.writeVulong(output, literals[i]); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java index 171ff89..3a0ba1d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java @@ -157,10 +157,19 @@ private long[] gapVsPatchList; private long min; private boolean isFixedDelta; + private SerializationUtils utils; + private boolean alignedBitpacking; RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) { + this(output, signed, true); + } + + RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed, + boolean alignedBitpacking) { this.output = output; this.signed = signed; + this.alignedBitpacking = alignedBitpacking; + this.utils = new SerializationUtils(); clear(); } @@ -187,6 +196,10 @@ private void writeDeltaValues() throws IOException { int fb = bitsDeltaMax; int efb = 0; + if (alignedBitpacking) { + fb = utils.getClosestAlignedFixedBits(fb); + } + if (isFixedDelta) { // if fixed run length is greater than threshold then it will be fixed // delta sequence with delta value 0 else fixed delta sequence with @@ -206,20 +219,20 @@ private void writeDeltaValues() throws IOException { if (fb == 1) { fb = 2; } - efb = SerializationUtils.encodeBitWidth(fb); + efb = utils.encodeBitWidth(fb); efb = efb << 1; len = variableRunLength - 1; variableRunLength = 0; } // extract the 9th bit of run length - int tailBits = (len & 0x100) >>> 8; + final int tailBits = (len & 0x100) >>> 8; // create first byte of the header - int headerFirstByte = getOpcode() | efb | tailBits; + final int headerFirstByte = getOpcode() | efb | tailBits; // second byte of the header stores the remaining 8 bits of runlength - int headerSecondByte = len & 0xff; + final int headerSecondByte = len & 0xff; // write header output.write(headerFirstByte); @@ -227,43 +240,50 @@ private void writeDeltaValues() throws IOException { // store the first value from zigzag literal array if (signed) { - SerializationUtils.writeVslong(output, literals[0]); + utils.writeVslong(output, literals[0]); } else { - SerializationUtils.writeVulong(output, literals[0]); + utils.writeVulong(output, literals[0]); } if (isFixedDelta) { // if delta is fixed then we don't need to store delta blob - SerializationUtils.writeVslong(output, fixedDelta); + utils.writeVslong(output, fixedDelta); } else { // store the first value as delta value using zigzag encoding - SerializationUtils.writeVslong(output, adjDeltas[0]); + utils.writeVslong(output, adjDeltas[0]); + // adjacent delta values are bit packed - SerializationUtils.writeInts(adjDeltas, 1, adjDeltas.length - 1, fb, - output); + utils.writeInts(adjDeltas, 1, adjDeltas.length - 1, fb, output); } } private void writePatchedBaseValues() throws IOException { + // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding + // because patch is applied to MSB bits. For example: If fixed bit width of + // base value is 7 bits and if patch is 3 bits, the actual value is + // constructed by shifting the patch to left by 7 positions. + // actual_value = patch << 7 | base_value + // So, if we align base_value then actual_value can not be reconstructed. + // write the number of fixed bits required in next 5 bits - int fb = brBits95p; - int efb = SerializationUtils.encodeBitWidth(fb) << 1; + final int fb = brBits95p; + final int efb = utils.encodeBitWidth(fb) << 1; // adjust variable run length, they are one off variableRunLength -= 1; // extract the 9th bit of run length - int tailBits = (variableRunLength & 0x100) >>> 8; + final int tailBits = (variableRunLength & 0x100) >>> 8; // create first byte of the header - int headerFirstByte = getOpcode() | efb | tailBits; + final int headerFirstByte = getOpcode() | efb | tailBits; // second byte of the header stores the remaining 8 bits of runlength - int headerSecondByte = variableRunLength & 0xff; + final int headerSecondByte = variableRunLength & 0xff; // if the min value is negative toggle the sign - boolean isNegative = min < 0 ? true : false; + final boolean isNegative = min < 0 ? true : false; if (isNegative) { min = -min; } @@ -271,9 +291,9 @@ private void writePatchedBaseValues() throws IOException { // find the number of bytes required for base and shift it by 5 bits // to accommodate patch width. The additional bit is used to store the sign // of the base value. - int baseWidth = SerializationUtils.findClosestNumBits(min) + 1; - int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1; - int bb = (baseBytes - 1) << 5; + final int baseWidth = utils.findClosestNumBits(min) + 1; + final int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1; + final int bb = (baseBytes - 1) << 5; // if the base value is negative then set MSB to 1 if (isNegative) { @@ -282,11 +302,11 @@ private void writePatchedBaseValues() throws IOException { // third byte contains 3 bits for number of bytes occupied by base // and 5 bits for patchWidth - int headerThirdByte = bb | SerializationUtils.encodeBitWidth(patchWidth); + final int headerThirdByte = bb | utils.encodeBitWidth(patchWidth); // fourth byte contains 3 bits for page gap width and 5 bits for // patch length - int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength; + final int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength; // write header output.write(headerFirstByte); @@ -301,15 +321,16 @@ private void writePatchedBaseValues() throws IOException { } // base reduced literals are bit packed - int closestFixedBits = SerializationUtils.getClosestFixedBits(brBits95p); - SerializationUtils.writeInts(baseRedLiterals, 0, baseRedLiterals.length, - closestFixedBits, output); + int closestFixedBits = utils.getClosestFixedBits(fb); + + utils.writeInts(baseRedLiterals, 0, baseRedLiterals.length, closestFixedBits, + output); // write patch list - closestFixedBits = SerializationUtils.getClosestFixedBits(patchGapWidth - + patchWidth); - SerializationUtils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, - closestFixedBits, output); + closestFixedBits = utils.getClosestFixedBits(patchGapWidth + patchWidth); + + utils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits, + output); // reset run length variableRunLength = 0; @@ -326,27 +347,32 @@ private int getOpcode() { private void writeDirectValues() throws IOException { // write the number of fixed bits required in next 5 bits - int efb = SerializationUtils.encodeBitWidth(zzBits100p) << 1; + int fb = zzBits100p; + + if (alignedBitpacking) { + fb = utils.getClosestAlignedFixedBits(fb); + } + + final int efb = utils.encodeBitWidth(fb) << 1; // adjust variable run length variableRunLength -= 1; // extract the 9th bit of run length - int tailBits = (variableRunLength & 0x100) >>> 8; + final int tailBits = (variableRunLength & 0x100) >>> 8; // create first byte of the header - int headerFirstByte = getOpcode() | efb | tailBits; + final int headerFirstByte = getOpcode() | efb | tailBits; // second byte of the header stores the remaining 8 bits of runlength - int headerSecondByte = variableRunLength & 0xff; + final int headerSecondByte = variableRunLength & 0xff; // write header output.write(headerFirstByte); output.write(headerSecondByte); // bit packing the zigzag encoded literals - SerializationUtils.writeInts(zigzagLiterals, 0, zigzagLiterals.length, - zzBits100p, output); + utils.writeInts(zigzagLiterals, 0, zigzagLiterals.length, fb, output); // reset run length variableRunLength = 0; @@ -356,13 +382,13 @@ private void writeShortRepeatValues() throws IOException { // get the value that is repeating, compute the bits and bytes required long repeatVal = 0; if (signed) { - repeatVal = SerializationUtils.zigzagEncode(literals[0]); + repeatVal = utils.zigzagEncode(literals[0]); } else { repeatVal = literals[0]; } - int numBitsRepeatVal = SerializationUtils.findClosestNumBits(repeatVal); - int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3 + final int numBitsRepeatVal = utils.findClosestNumBits(repeatVal); + final int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3 : (numBitsRepeatVal >>> 3) + 1; // write encoding type in top 2 bits @@ -440,7 +466,7 @@ private void determineEncoding() { // populate zigzag encoded literals long zzEncVal = 0; if (signed) { - zzEncVal = SerializationUtils.zigzagEncode(literals[i]); + zzEncVal = utils.zigzagEncode(literals[i]); } else { zzEncVal = literals[i]; } @@ -464,7 +490,7 @@ private void determineEncoding() { // stores the number of bits required for packing delta blob in // delta encoding - bitsDeltaMax = SerializationUtils.findClosestNumBits(deltaMax); + bitsDeltaMax = utils.findClosestNumBits(deltaMax); // if decreasing count equals total number of literals then the // sequence is monotonically decreasing @@ -504,10 +530,10 @@ private void determineEncoding() { // is not significant then we can use direct or delta encoding double p = 0.9; - zzBits90p = SerializationUtils.percentileBits(zigzagLiterals, p); + zzBits90p = utils.percentileBits(zigzagLiterals, p); p = 1.0; - zzBits100p = SerializationUtils.percentileBits(zigzagLiterals, p); + zzBits100p = utils.percentileBits(zigzagLiterals, p); int diffBitsLH = zzBits100p - zzBits90p; @@ -524,11 +550,11 @@ private void determineEncoding() { // 95th percentile width is used to determine max allowed value // after which patching will be done p = 0.95; - brBits95p = SerializationUtils.percentileBits(baseRedLiterals, p); + brBits95p = utils.percentileBits(baseRedLiterals, p); // 100th percentile is used to compute the max patch width p = 1.0; - brBits100p = SerializationUtils.percentileBits(baseRedLiterals, p); + brBits100p = utils.percentileBits(baseRedLiterals, p); // after base reducing the values, if the difference in bits between // 95th percentile and 100th percentile value is zero then there @@ -573,7 +599,7 @@ private void preparePatchedBlob() { // #bit for patch patchWidth = brBits100p - brBits95p; - patchWidth = SerializationUtils.getClosestFixedBits(patchWidth); + patchWidth = utils.getClosestFixedBits(patchWidth); // if patch bit requirement is 64 then it will not possible to pack // gap and patch together in a long. To make sure gap and patch can be @@ -619,7 +645,7 @@ private void preparePatchedBlob() { if (maxGap == 0 && patchLength != 0) { patchGapWidth = 1; } else { - patchGapWidth = SerializationUtils.findClosestNumBits(maxGap); + patchGapWidth = utils.findClosestNumBits(maxGap); } // special case: if the patch gap width is greater than 256, then diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java index 20c8c6d..b8af7f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java @@ -26,10 +26,16 @@ final class SerializationUtils { - // unused - private SerializationUtils() {} + private final static int BUFFER_SIZE = 64; + private final byte[] readBuffer; + private final byte[] writeBuffer; - static void writeVulong(OutputStream output, long value) throws IOException { + public SerializationUtils() { + this.readBuffer = new byte[BUFFER_SIZE]; + this.writeBuffer = new byte[BUFFER_SIZE]; + } + + void writeVulong(OutputStream output, long value) throws IOException { while (true) { if ((value & ~0x7f) == 0) { output.write((byte) value); @@ -41,12 +47,12 @@ static void writeVulong(OutputStream output, long value) throws IOException { } } - static void writeVslong(OutputStream output, long value) throws IOException { + void writeVslong(OutputStream output, long value) throws IOException { writeVulong(output, (value << 1) ^ (value >> 63)); } - static long readVulong(InputStream in) throws IOException { + long readVulong(InputStream in) throws IOException { long result = 0; long b; int offset = 0; @@ -61,18 +67,18 @@ static long readVulong(InputStream in) throws IOException { return result; } - static long readVslong(InputStream in) throws IOException { + long readVslong(InputStream in) throws IOException { long result = readVulong(in); return (result >>> 1) ^ -(result & 1); } - static float readFloat(InputStream in) throws IOException { + float readFloat(InputStream in) throws IOException { int ser = in.read() | (in.read() << 8) | (in.read() << 16) | (in.read() << 24); return Float.intBitsToFloat(ser); } - static void writeFloat(OutputStream output, float value) throws IOException { + void writeFloat(OutputStream output, float value) throws IOException { int ser = Float.floatToIntBits(value); output.write(ser & 0xff); output.write((ser >> 8) & 0xff); @@ -80,29 +86,36 @@ static void writeFloat(OutputStream output, float value) throws IOException { output.write((ser >> 24) & 0xff); } - static double readDouble(InputStream in) throws IOException { - long ser = (long) in.read() | - ((long) in.read() << 8) | - ((long) in.read() << 16) | - ((long) in.read() << 24) | - ((long) in.read() << 32) | - ((long) in.read() << 40) | - ((long) in.read() << 48) | - ((long) in.read() << 56); - return Double.longBitsToDouble(ser); + double readDouble(InputStream in) throws IOException { + return Double.longBitsToDouble(readLongLE(in)); + } + + long readLongLE(InputStream in) throws IOException { + in.read(readBuffer, 0, 8); + return ((readBuffer[0] << 0) + + ((readBuffer[1] & 255) << 8) + + ((readBuffer[2] & 255) << 16) + + ((long) (readBuffer[3] & 255) << 24) + + ((long) (readBuffer[4] & 255) << 32) + + ((long) (readBuffer[5] & 255) << 40) + + ((long) (readBuffer[6] & 255) << 48) + + ((long) (readBuffer[7] & 255) << 56)); } - static void writeDouble(OutputStream output, - double value) throws IOException { - long ser = Double.doubleToLongBits(value); - output.write(((int) ser) & 0xff); - output.write(((int) (ser >> 8)) & 0xff); - output.write(((int) (ser >> 16)) & 0xff); - output.write(((int) (ser >> 24)) & 0xff); - output.write(((int) (ser >> 32)) & 0xff); - output.write(((int) (ser >> 40)) & 0xff); - output.write(((int) (ser >> 48)) & 0xff); - output.write(((int) (ser >> 56)) & 0xff); + void writeDouble(OutputStream output, double value) throws IOException { + writeLongLE(output, Double.doubleToLongBits(value)); + } + + private void writeLongLE(OutputStream output, long value) throws IOException { + writeBuffer[0] = (byte) (value >>> 0); + writeBuffer[1] = (byte) (value >>> 8); + writeBuffer[2] = (byte) (value >>> 16); + writeBuffer[3] = (byte) (value >>> 24); + writeBuffer[4] = (byte) (value >>> 32); + writeBuffer[5] = (byte) (value >>> 40); + writeBuffer[6] = (byte) (value >>> 48); + writeBuffer[7] = (byte) (value >>> 56); + output.write(writeBuffer, 0, 8); } /** @@ -198,7 +211,7 @@ static BigInteger readBigInteger(InputStream input) throws IOException { * @param value * @return bits required to store value */ - static int findClosestNumBits(long value) { + int findClosestNumBits(long value) { int count = 0; while (value != 0) { count++; @@ -212,7 +225,7 @@ static int findClosestNumBits(long value) { * @param val * @return zigzag encoded value */ - static long zigzagEncode(long val) { + long zigzagEncode(long val) { return (val << 1) ^ (val >> 63); } @@ -221,7 +234,7 @@ static long zigzagEncode(long val) { * @param val * @return zizag decoded value */ - static long zigzagDecode(long val) { + long zigzagDecode(long val) { return (val >>> 1) ^ -(val & 1); } @@ -231,7 +244,7 @@ static long zigzagDecode(long val) { * @param p - percentile value (>=0.0 to <=1.0) * @return pth percentile bits */ - static int percentileBits(long[] data, double p) { + int percentileBits(long[] data, double p) { if ((p > 1.0) || (p <= 0.0)) { return -1; } @@ -265,7 +278,7 @@ static int percentileBits(long[] data, double p) { * @param b - byte array * @return long value */ - static long bytesToLongBE(InStream input, int n) throws IOException { + long bytesToLongBE(InStream input, int n) throws IOException { long out = 0; long val = 0; while (n > 0) { @@ -283,7 +296,7 @@ static long bytesToLongBE(InStream input, int n) throws IOException { * @param numBits - bit width * @return number of bytes required */ - static int getTotalBytesRequired(int n, int numBits) { + int getTotalBytesRequired(int n, int numBits) { return (n * numBits + 7) / 8; } @@ -293,7 +306,7 @@ static int getTotalBytesRequired(int n, int numBits) { * @param n * @return closest valid fixed bit */ - static int getClosestFixedBits(int n) { + int getClosestFixedBits(int n) { if (n == 0) { return 1; } @@ -319,13 +332,39 @@ static int getClosestFixedBits(int n) { } } + public int getClosestAlignedFixedBits(int n) { + if (n == 0 || n == 1) { + return 1; + } else if (n > 1 && n <= 2) { + return 2; + } else if (n > 2 && n <= 4) { + return 4; + } else if (n > 4 && n <= 8) { + return 8; + } else if (n > 8 && n <= 16) { + return 16; + } else if (n > 16 && n <= 24) { + return 24; + } else if (n > 24 && n <= 32) { + return 32; + } else if (n > 32 && n <= 40) { + return 40; + } else if (n > 40 && n <= 48) { + return 48; + } else if (n > 48 && n <= 56) { + return 56; + } else { + return 64; + } + } + /** * Finds the closest available fixed bit width match and returns its encoded * value (ordinal) * @param n - fixed bit width to encode * @return encoded fixed bit width */ - static int encodeBitWidth(int n) { + int encodeBitWidth(int n) { n = getClosestFixedBits(n); if (n >= 1 && n <= 24) { @@ -354,7 +393,7 @@ static int encodeBitWidth(int n) { * @param n - encoded fixed bit width * @return decoded fixed bit width */ - static int decodeBitWidth(int n) { + int decodeBitWidth(int n) { if (n >= FixedBitSizes.ONE.ordinal() && n <= FixedBitSizes.TWENTYFOUR.ordinal()) { return n + 1; @@ -386,13 +425,51 @@ static int decodeBitWidth(int n) { * @param output - output stream * @throws IOException */ - static void writeInts(long[] input, int offset, int len, int bitSize, + void writeInts(long[] input, int offset, int len, int bitSize, OutputStream output) throws IOException { if (input == null || input.length < 1 || offset < 0 || len < 1 || bitSize < 1) { return; } + switch (bitSize) { + case 1: + unrolledBitPack1(input, offset, len, output); + return; + case 2: + unrolledBitPack2(input, offset, len, output); + return; + case 4: + unrolledBitPack4(input, offset, len, output); + return; + case 8: + unrolledBitPack8(input, offset, len, output); + return; + case 16: + unrolledBitPack16(input, offset, len, output); + return; + case 24: + unrolledBitPack24(input, offset, len, output); + return; + case 32: + unrolledBitPack32(input, offset, len, output); + return; + case 40: + unrolledBitPack40(input, offset, len, output); + return; + case 48: + unrolledBitPack48(input, offset, len, output); + return; + case 56: + unrolledBitPack56(input, offset, len, output); + return; + case 64: + unrolledBitPack64(input, offset, len, output); + return; + default: + break; + } + int bitsLeft = 8; byte current = 0; for(int i = offset; i < (offset + len); i++) { @@ -426,6 +503,357 @@ static void writeInts(long[] input, int offset, int len, int bitSize, } } + private void unrolledBitPack1(long[] input, int offset, int len, + OutputStream output) throws IOException { + final int numHops = 8; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int val = 0; + for (int i = offset; i < endUnroll; i = i + numHops) { + val = (int) (val | ((input[i] & 1) << 7) + | ((input[i + 1] & 1) << 6) + | ((input[i + 2] & 1) << 5) + | ((input[i + 3] & 1) << 4) + | ((input[i + 4] & 1) << 3) + | ((input[i + 5] & 1) << 2) + | ((input[i + 6] & 1) << 1) + | (input[i + 7]) & 1); + output.write(val); + val = 0; + } + + if (remainder > 0) { + int startShift = 7; + for (int i = endUnroll; i < endOffset; i++) { + val = (int) (val | (input[i] & 1) << startShift); + startShift -= 1; + } + output.write(val); + } + } + + private void unrolledBitPack2(long[] input, int offset, int len, + OutputStream output) throws IOException { + final int numHops = 4; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int val = 0; + for (int i = offset; i < endUnroll; i = i + numHops) { + val = (int) (val | ((input[i] & 3) << 6) + | ((input[i + 1] & 3) << 4) + | ((input[i + 2] & 3) << 2) + | (input[i + 3]) & 3); + output.write(val); + val = 0; + } + + if (remainder > 0) { + int startShift = 6; + for (int i = endUnroll; i < endOffset; i++) { + val = (int) (val | (input[i] & 3) << startShift); + startShift -= 2; + } + output.write(val); + } + } + + private void unrolledBitPack4(long[] input, int offset, int len, + OutputStream output) throws IOException { + final int numHops = 2; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int val = 0; + for (int i = offset; i < endUnroll; i = i + numHops) { + val = (int) (val | ((input[i] & 15) << 4) | (input[i + 1]) & 15); + output.write(val); + val = 0; + } + + if (remainder > 0) { + int startShift = 4; + for (int i = endUnroll; i < endOffset; i++) { + val = (int) (val | (input[i] & 15) << startShift); + startShift -= 4; + } + output.write(val); + } + } + + private void unrolledBitPack8(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 1); + } + + private void unrolledBitPack16(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 2); + } + + private void unrolledBitPack24(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 3); + } + + private void unrolledBitPack32(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 4); + } + + private void unrolledBitPack40(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 5); + } + + private void unrolledBitPack48(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 6); + } + + private void unrolledBitPack56(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 7); + } + + private void unrolledBitPack64(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 8); + } + + private void unrolledBitPackBytes(long[] input, int offset, int len, OutputStream output, int numBytes) throws IOException { + final int numHops = 8; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int i = offset; + for (; i < endUnroll; i = i + numHops) { + writeLongBE(output, input, i, numHops, numBytes); + } + + if (remainder > 0) { + writeRemainingLongs(output, i, input, remainder, numBytes); + } + } + + private void writeRemainingLongs(OutputStream output, int offset, long[] input, int remainder, + int numBytes) throws IOException { + final int numHops = remainder; + + int idx = 0; + switch (numBytes) { + case 1: + while (remainder > 0) { + writeBuffer[idx] = (byte) (input[offset + idx] & 255); + remainder--; + idx++; + } + break; + case 2: + while (remainder > 0) { + writeLongBE2(output, input[offset + idx], idx * 2); + remainder--; + idx++; + } + break; + case 3: + while (remainder > 0) { + writeLongBE3(output, input[offset + idx], idx * 3); + remainder--; + idx++; + } + break; + case 4: + while (remainder > 0) { + writeLongBE4(output, input[offset + idx], idx * 4); + remainder--; + idx++; + } + break; + case 5: + while (remainder > 0) { + writeLongBE5(output, input[offset + idx], idx * 5); + remainder--; + idx++; + } + break; + case 6: + while (remainder > 0) { + writeLongBE6(output, input[offset + idx], idx * 6); + remainder--; + idx++; + } + break; + case 7: + while (remainder > 0) { + writeLongBE7(output, input[offset + idx], idx * 7); + remainder--; + idx++; + } + break; + case 8: + while (remainder > 0) { + writeLongBE8(output, input[offset + idx], idx * 8); + remainder--; + idx++; + } + break; + default: + break; + } + + final int toWrite = numHops * numBytes; + output.write(writeBuffer, 0, toWrite); + } + + private void writeLongBE(OutputStream output, long[] input, int offset, int numHops, int numBytes) throws IOException { + + switch (numBytes) { + case 1: + writeBuffer[0] = (byte) (input[offset + 0] & 255); + writeBuffer[1] = (byte) (input[offset + 1] & 255); + writeBuffer[2] = (byte) (input[offset + 2] & 255); + writeBuffer[3] = (byte) (input[offset + 3] & 255); + writeBuffer[4] = (byte) (input[offset + 4] & 255); + writeBuffer[5] = (byte) (input[offset + 5] & 255); + writeBuffer[6] = (byte) (input[offset + 6] & 255); + writeBuffer[7] = (byte) (input[offset + 7] & 255); + break; + case 2: + writeLongBE2(output, input[offset + 0], 0); + writeLongBE2(output, input[offset + 1], 2); + writeLongBE2(output, input[offset + 2], 4); + writeLongBE2(output, input[offset + 3], 6); + writeLongBE2(output, input[offset + 4], 8); + writeLongBE2(output, input[offset + 5], 10); + writeLongBE2(output, input[offset + 6], 12); + writeLongBE2(output, input[offset + 7], 14); + break; + case 3: + writeLongBE3(output, input[offset + 0], 0); + writeLongBE3(output, input[offset + 1], 3); + writeLongBE3(output, input[offset + 2], 6); + writeLongBE3(output, input[offset + 3], 9); + writeLongBE3(output, input[offset + 4], 12); + writeLongBE3(output, input[offset + 5], 15); + writeLongBE3(output, input[offset + 6], 18); + writeLongBE3(output, input[offset + 7], 21); + break; + case 4: + writeLongBE4(output, input[offset + 0], 0); + writeLongBE4(output, input[offset + 1], 4); + writeLongBE4(output, input[offset + 2], 8); + writeLongBE4(output, input[offset + 3], 12); + writeLongBE4(output, input[offset + 4], 16); + writeLongBE4(output, input[offset + 5], 20); + writeLongBE4(output, input[offset + 6], 24); + writeLongBE4(output, input[offset + 7], 28); + break; + case 5: + writeLongBE5(output, input[offset + 0], 0); + writeLongBE5(output, input[offset + 1], 5); + writeLongBE5(output, input[offset + 2], 10); + writeLongBE5(output, input[offset + 3], 15); + writeLongBE5(output, input[offset + 4], 20); + writeLongBE5(output, input[offset + 5], 25); + writeLongBE5(output, input[offset + 6], 30); + writeLongBE5(output, input[offset + 7], 35); + break; + case 6: + writeLongBE6(output, input[offset + 0], 0); + writeLongBE6(output, input[offset + 1], 6); + writeLongBE6(output, input[offset + 2], 12); + writeLongBE6(output, input[offset + 3], 18); + writeLongBE6(output, input[offset + 4], 24); + writeLongBE6(output, input[offset + 5], 30); + writeLongBE6(output, input[offset + 6], 36); + writeLongBE6(output, input[offset + 7], 42); + break; + case 7: + writeLongBE7(output, input[offset + 0], 0); + writeLongBE7(output, input[offset + 1], 7); + writeLongBE7(output, input[offset + 2], 14); + writeLongBE7(output, input[offset + 3], 21); + writeLongBE7(output, input[offset + 4], 28); + writeLongBE7(output, input[offset + 5], 35); + writeLongBE7(output, input[offset + 6], 42); + writeLongBE7(output, input[offset + 7], 49); + break; + case 8: + writeLongBE8(output, input[offset + 0], 0); + writeLongBE8(output, input[offset + 1], 8); + writeLongBE8(output, input[offset + 2], 16); + writeLongBE8(output, input[offset + 3], 24); + writeLongBE8(output, input[offset + 4], 32); + writeLongBE8(output, input[offset + 5], 40); + writeLongBE8(output, input[offset + 6], 48); + writeLongBE8(output, input[offset + 7], 56); + break; + default: + break; + } + + final int toWrite = numHops * numBytes; + output.write(writeBuffer, 0, toWrite); + } + + private void writeLongBE2(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 8); + writeBuffer[wbOffset + 1] = (byte) (val >>> 0); + } + + private void writeLongBE3(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 16); + writeBuffer[wbOffset + 1] = (byte) (val >>> 8); + writeBuffer[wbOffset + 2] = (byte) (val >>> 0); + } + + private void writeLongBE4(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 24); + writeBuffer[wbOffset + 1] = (byte) (val >>> 16); + writeBuffer[wbOffset + 2] = (byte) (val >>> 8); + writeBuffer[wbOffset + 3] = (byte) (val >>> 0); + } + + private void writeLongBE5(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 32); + writeBuffer[wbOffset + 1] = (byte) (val >>> 24); + writeBuffer[wbOffset + 2] = (byte) (val >>> 16); + writeBuffer[wbOffset + 3] = (byte) (val >>> 8); + writeBuffer[wbOffset + 4] = (byte) (val >>> 0); + } + + private void writeLongBE6(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 40); + writeBuffer[wbOffset + 1] = (byte) (val >>> 32); + writeBuffer[wbOffset + 2] = (byte) (val >>> 24); + writeBuffer[wbOffset + 3] = (byte) (val >>> 16); + writeBuffer[wbOffset + 4] = (byte) (val >>> 8); + writeBuffer[wbOffset + 5] = (byte) (val >>> 0); + } + + private void writeLongBE7(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 48); + writeBuffer[wbOffset + 1] = (byte) (val >>> 40); + writeBuffer[wbOffset + 2] = (byte) (val >>> 32); + writeBuffer[wbOffset + 3] = (byte) (val >>> 24); + writeBuffer[wbOffset + 4] = (byte) (val >>> 16); + writeBuffer[wbOffset + 5] = (byte) (val >>> 8); + writeBuffer[wbOffset + 6] = (byte) (val >>> 0); + } + + private void writeLongBE8(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 56); + writeBuffer[wbOffset + 1] = (byte) (val >>> 48); + writeBuffer[wbOffset + 2] = (byte) (val >>> 40); + writeBuffer[wbOffset + 3] = (byte) (val >>> 32); + writeBuffer[wbOffset + 4] = (byte) (val >>> 24); + writeBuffer[wbOffset + 5] = (byte) (val >>> 16); + writeBuffer[wbOffset + 6] = (byte) (val >>> 8); + writeBuffer[wbOffset + 7] = (byte) (val >>> 0); + } + /** * Read bitpacked integers from input stream * @param buffer - input buffer @@ -435,11 +863,49 @@ static void writeInts(long[] input, int offset, int len, int bitSize, * @param input - input stream * @throws IOException */ - static void readInts(long[] buffer, int offset, int len, int bitSize, + void readInts(long[] buffer, int offset, int len, int bitSize, InStream input) throws IOException { int bitsLeft = 0; int current = 0; + switch (bitSize) { + case 1: + unrolledUnPack1(buffer, offset, len, input); + return; + case 2: + unrolledUnPack2(buffer, offset, len, input); + return; + case 4: + unrolledUnPack4(buffer, offset, len, input); + return; + case 8: + unrolledUnPack8(buffer, offset, len, input); + return; + case 16: + unrolledUnPack16(buffer, offset, len, input); + return; + case 24: + unrolledUnPack24(buffer, offset, len, input); + return; + case 32: + unrolledUnPack32(buffer, offset, len, input); + return; + case 40: + unrolledUnPack40(buffer, offset, len, input); + return; + case 48: + unrolledUnPack48(buffer, offset, len, input); + return; + case 56: + unrolledUnPack56(buffer, offset, len, input); + return; + case 64: + unrolledUnPack64(buffer, offset, len, input); + return; + default: + break; + } + for(int i = offset; i < (offset + len); i++) { long result = 0; int bitsLeftToRead = bitSize; @@ -460,4 +926,362 @@ static void readInts(long[] buffer, int offset, int len, int bitSize, buffer[i] = result; } } + + + private void unrolledUnPack1(long[] buffer, int offset, int len, + InStream input) throws IOException { + final int numHops = 8; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int val = 0; + for (int i = offset; i < endUnroll; i = i + numHops) { + val = input.read(); + buffer[i] = (val >>> 7) & 1; + buffer[i + 1] = (val >>> 6) & 1; + buffer[i + 2] = (val >>> 5) & 1; + buffer[i + 3] = (val >>> 4) & 1; + buffer[i + 4] = (val >>> 3) & 1; + buffer[i + 5] = (val >>> 2) & 1; + buffer[i + 6] = (val >>> 1) & 1; + buffer[i + 7] = val & 1; + } + + if (remainder > 0) { + int startShift = 7; + val = input.read(); + for (int i = endUnroll; i < endOffset; i++) { + buffer[i] = (val >>> startShift) & 1; + startShift -= 1; + } + } + } + + private void unrolledUnPack2(long[] buffer, int offset, int len, + InStream input) throws IOException { + final int numHops = 4; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int val = 0; + for (int i = offset; i < endUnroll; i = i + numHops) { + val = input.read(); + buffer[i] = (val >>> 6) & 3; + buffer[i + 1] = (val >>> 4) & 3; + buffer[i + 2] = (val >>> 2) & 3; + buffer[i + 3] = val & 3; + } + + if (remainder > 0) { + int startShift = 6; + val = input.read(); + for (int i = endUnroll; i < endOffset; i++) { + buffer[i] = (val >>> startShift) & 3; + startShift -= 2; + } + } + } + + private void unrolledUnPack4(long[] buffer, int offset, int len, + InStream input) throws IOException { + final int numHops = 2; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int val = 0; + for (int i = offset; i < endUnroll; i = i + numHops) { + val = input.read(); + buffer[i] = (val >>> 4) & 15; + buffer[i + 1] = val & 15; + } + + if (remainder > 0) { + int startShift = 4; + val = input.read(); + for (int i = endUnroll; i < endOffset; i++) { + buffer[i] = (val >>> startShift) & 15; + startShift -= 4; + } + } + } + + private void unrolledUnPack8(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 1); + } + + private void unrolledUnPack16(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 2); + } + + private void unrolledUnPack24(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 3); + } + + private void unrolledUnPack32(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 4); + } + + private void unrolledUnPack40(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 5); + } + + private void unrolledUnPack48(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 6); + } + + private void unrolledUnPack56(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 7); + } + + private void unrolledUnPack64(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 8); + } + + private void unrolledUnPackBytes(long[] buffer, int offset, int len, InStream input, int numBytes) + throws IOException { + final int numHops = 8; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int i = offset; + for (; i < endUnroll; i = i + numHops) { + readLongBE(input, buffer, i, numHops, numBytes); + } + + if (remainder > 0) { + readRemainingLongs(buffer, i, input, remainder, numBytes); + } + } + + private void readRemainingLongs(long[] buffer, int offset, InStream input, int remainder, + int numBytes) throws IOException { + final int toRead = remainder * numBytes; + // bulk read to buffer + int bytesRead = input.read(readBuffer, 0, toRead); + while (bytesRead != toRead) { + bytesRead += input.read(readBuffer, bytesRead, toRead - bytesRead); + } + + int idx = 0; + switch (numBytes) { + case 1: + while (remainder > 0) { + buffer[offset++] = readBuffer[idx] & 255; + remainder--; + idx++; + } + break; + case 2: + while (remainder > 0) { + buffer[offset++] = readLongBE2(input, idx * 2); + remainder--; + idx++; + } + break; + case 3: + while (remainder > 0) { + buffer[offset++] = readLongBE3(input, idx * 3); + remainder--; + idx++; + } + break; + case 4: + while (remainder > 0) { + buffer[offset++] = readLongBE4(input, idx * 4); + remainder--; + idx++; + } + break; + case 5: + while (remainder > 0) { + buffer[offset++] = readLongBE5(input, idx * 5); + remainder--; + idx++; + } + break; + case 6: + while (remainder > 0) { + buffer[offset++] = readLongBE6(input, idx * 6); + remainder--; + idx++; + } + break; + case 7: + while (remainder > 0) { + buffer[offset++] = readLongBE7(input, idx * 7); + remainder--; + idx++; + } + break; + case 8: + while (remainder > 0) { + buffer[offset++] = readLongBE8(input, idx * 8); + remainder--; + idx++; + } + break; + default: + break; + } + } + + private void readLongBE(InStream in, long[] buffer, int start, int numHops, int numBytes) + throws IOException { + final int toRead = numHops * numBytes; + // bulk read to buffer + int bytesRead = in.read(readBuffer, 0, toRead); + while (bytesRead != toRead) { + bytesRead += in.read(readBuffer, bytesRead, toRead - bytesRead); + } + + switch (numBytes) { + case 1: + buffer[start + 0] = readBuffer[0] & 255; + buffer[start + 1] = readBuffer[1] & 255; + buffer[start + 2] = readBuffer[2] & 255; + buffer[start + 3] = readBuffer[3] & 255; + buffer[start + 4] = readBuffer[4] & 255; + buffer[start + 5] = readBuffer[5] & 255; + buffer[start + 6] = readBuffer[6] & 255; + buffer[start + 7] = readBuffer[7] & 255; + break; + case 2: + buffer[start + 0] = readLongBE2(in, 0); + buffer[start + 1] = readLongBE2(in, 2); + buffer[start + 2] = readLongBE2(in, 4); + buffer[start + 3] = readLongBE2(in, 6); + buffer[start + 4] = readLongBE2(in, 8); + buffer[start + 5] = readLongBE2(in, 10); + buffer[start + 6] = readLongBE2(in, 12); + buffer[start + 7] = readLongBE2(in, 14); + break; + case 3: + buffer[start + 0] = readLongBE3(in, 0); + buffer[start + 1] = readLongBE3(in, 3); + buffer[start + 2] = readLongBE3(in, 6); + buffer[start + 3] = readLongBE3(in, 9); + buffer[start + 4] = readLongBE3(in, 12); + buffer[start + 5] = readLongBE3(in, 15); + buffer[start + 6] = readLongBE3(in, 18); + buffer[start + 7] = readLongBE3(in, 21); + break; + case 4: + buffer[start + 0] = readLongBE4(in, 0); + buffer[start + 1] = readLongBE4(in, 4); + buffer[start + 2] = readLongBE4(in, 8); + buffer[start + 3] = readLongBE4(in, 12); + buffer[start + 4] = readLongBE4(in, 16); + buffer[start + 5] = readLongBE4(in, 20); + buffer[start + 6] = readLongBE4(in, 24); + buffer[start + 7] = readLongBE4(in, 28); + break; + case 5: + buffer[start + 0] = readLongBE5(in, 0); + buffer[start + 1] = readLongBE5(in, 5); + buffer[start + 2] = readLongBE5(in, 10); + buffer[start + 3] = readLongBE5(in, 15); + buffer[start + 4] = readLongBE5(in, 20); + buffer[start + 5] = readLongBE5(in, 25); + buffer[start + 6] = readLongBE5(in, 30); + buffer[start + 7] = readLongBE5(in, 35); + break; + case 6: + buffer[start + 0] = readLongBE6(in, 0); + buffer[start + 1] = readLongBE6(in, 6); + buffer[start + 2] = readLongBE6(in, 12); + buffer[start + 3] = readLongBE6(in, 18); + buffer[start + 4] = readLongBE6(in, 24); + buffer[start + 5] = readLongBE6(in, 30); + buffer[start + 6] = readLongBE6(in, 36); + buffer[start + 7] = readLongBE6(in, 42); + break; + case 7: + buffer[start + 0] = readLongBE7(in, 0); + buffer[start + 1] = readLongBE7(in, 7); + buffer[start + 2] = readLongBE7(in, 14); + buffer[start + 3] = readLongBE7(in, 21); + buffer[start + 4] = readLongBE7(in, 28); + buffer[start + 5] = readLongBE7(in, 35); + buffer[start + 6] = readLongBE7(in, 42); + buffer[start + 7] = readLongBE7(in, 49); + break; + case 8: + buffer[start + 0] = readLongBE8(in, 0); + buffer[start + 1] = readLongBE8(in, 8); + buffer[start + 2] = readLongBE8(in, 16); + buffer[start + 3] = readLongBE8(in, 24); + buffer[start + 4] = readLongBE8(in, 32); + buffer[start + 5] = readLongBE8(in, 40); + buffer[start + 6] = readLongBE8(in, 48); + buffer[start + 7] = readLongBE8(in, 56); + break; + default: + break; + } + } + + private long readLongBE2(InStream in, int rbOffset) { + return (((readBuffer[rbOffset] & 255) << 8) + + ((readBuffer[rbOffset + 1] & 255) << 0)); + } + + private long readLongBE3(InStream in, int rbOffset) { + return (((readBuffer[rbOffset] & 255) << 16) + + ((readBuffer[rbOffset + 1] & 255) << 8) + + ((readBuffer[rbOffset + 2] & 255) << 0)); + } + + private long readLongBE4(InStream in, int rbOffset) { + return (((long) (readBuffer[rbOffset] & 255) << 24) + + ((readBuffer[rbOffset + 1] & 255) << 16) + + ((readBuffer[rbOffset + 2] & 255) << 8) + + ((readBuffer[rbOffset + 3] & 255) << 0)); + } + + private long readLongBE5(InStream in, int rbOffset) { + return (((long) (readBuffer[rbOffset] & 255) << 32) + + ((long) (readBuffer[rbOffset + 1] & 255) << 24) + + ((readBuffer[rbOffset + 2] & 255) << 16) + + ((readBuffer[rbOffset + 3] & 255) << 8) + + ((readBuffer[rbOffset + 4] & 255) << 0)); + } + + private long readLongBE6(InStream in, int rbOffset) { + return (((long) (readBuffer[rbOffset] & 255) << 40) + + ((long) (readBuffer[rbOffset + 1] & 255) << 32) + + ((long) (readBuffer[rbOffset + 2] & 255) << 24) + + ((readBuffer[rbOffset + 3] & 255) << 16) + + ((readBuffer[rbOffset + 4] & 255) << 8) + + ((readBuffer[rbOffset + 5] & 255) << 0)); + } + + private long readLongBE7(InStream in, int rbOffset) { + return (((long) (readBuffer[rbOffset] & 255) << 48) + + ((long) (readBuffer[rbOffset + 1] & 255) << 40) + + ((long) (readBuffer[rbOffset + 2] & 255) << 32) + + ((long) (readBuffer[rbOffset + 3] & 255) << 24) + + ((readBuffer[rbOffset + 4] & 255) << 16) + + ((readBuffer[rbOffset + 5] & 255) << 8) + + ((readBuffer[rbOffset + 6] & 255) << 0)); + } + + private long readLongBE8(InStream in, int rbOffset) { + return (((long) (readBuffer[rbOffset] & 255) << 56) + + ((long) (readBuffer[rbOffset + 1] & 255) << 48) + + ((long) (readBuffer[rbOffset + 2] & 255) << 40) + + ((long) (readBuffer[rbOffset + 3] & 255) << 32) + + ((long) (readBuffer[rbOffset + 4] & 255) << 24) + + ((readBuffer[rbOffset + 5] & 255) << 16) + + ((readBuffer[rbOffset + 6] & 255) << 8) + + ((readBuffer[rbOffset + 7] & 255) << 0)); + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index b3ab96f..e6f8760 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -28,6 +28,7 @@ import java.util.TreeMap; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -36,6 +37,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy; import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry; import org.apache.hadoop.hive.ql.io.orc.OrcProto.StripeStatistics; import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; @@ -134,6 +136,7 @@ private final Configuration conf; private final OrcFile.WriterCallback callback; private final OrcFile.WriterContext callbackContext; + private final OrcFile.EncodingStrategy encodingStrategy; WriterImpl(FileSystem fs, Path path, @@ -146,7 +149,8 @@ MemoryManager memoryManager, boolean addBlockPadding, OrcFile.Version version, - OrcFile.WriterCallback callback) throws IOException { + OrcFile.WriterCallback callback, + OrcFile.EncodingStrategy encodingStrategy) throws IOException { this.fs = fs; this.path = path; this.conf = conf; @@ -164,6 +168,7 @@ public Writer getWriter() { } this.stripeSize = stripeSize; this.version = version; + this.encodingStrategy = encodingStrategy; this.addBlockPadding = addBlockPadding; // pick large block size to minimize block over or under hangs this.blockSize = Math.min(MAX_BLOCK_SIZE, 2 * stripeSize); @@ -404,6 +409,14 @@ public boolean isCompressed() { } /** + * Get the encoding strategy to use. + * @return encoding strategy + */ + public EncodingStrategy getEncodingStrategy() { + return encodingStrategy; + } + + /** * Get the writer's configuration. * @return configuration */ @@ -497,9 +510,14 @@ protected ColumnStatisticsImpl getFileStatistics() { } IntegerWriter createIntegerWriter(PositionedOutputStream output, - boolean signed, boolean isDirectV2) { + boolean signed, boolean isDirectV2, + StreamFactory writer) { if (isDirectV2) { - return new RunLengthIntegerWriterV2(output, signed); + boolean alignedBitpacking = false; + if (writer.getEncodingStrategy().equals(EncodingStrategy.BEST_SPEED)) { + alignedBitpacking = true; + } + return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking); } else { return new RunLengthIntegerWriter(output, signed); } @@ -744,7 +762,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { PositionedOutputStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); - this.writer = createIntegerWriter(out, true, isDirectV2); + this.writer = createIntegerWriter(out, true, isDirectV2, writer); if (inspector instanceof IntObjectInspector) { intInspector = (IntObjectInspector) inspector; shortInspector = null; @@ -806,6 +824,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { private static class FloatTreeWriter extends TreeWriter { private final PositionedOutputStream stream; + private final SerializationUtils utils; FloatTreeWriter(int columnId, ObjectInspector inspector, @@ -814,6 +833,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { super(columnId, inspector, writer, nullable); this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); + this.utils = new SerializationUtils(); recordPosition(rowIndexPosition); } @@ -823,7 +843,7 @@ void write(Object obj) throws IOException { if (obj != null) { float val = ((FloatObjectInspector) inspector).get(obj); indexStatistics.updateDouble(val); - SerializationUtils.writeFloat(stream, val); + utils.writeFloat(stream, val); } } @@ -844,6 +864,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { private static class DoubleTreeWriter extends TreeWriter { private final PositionedOutputStream stream; + private final SerializationUtils utils; DoubleTreeWriter(int columnId, ObjectInspector inspector, @@ -852,6 +873,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { super(columnId, inspector, writer, nullable); this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); + this.utils = new SerializationUtils(); recordPosition(rowIndexPosition); } @@ -861,7 +883,7 @@ void write(Object obj) throws IOException { if (obj != null) { double val = ((DoubleObjectInspector) inspector).get(obj); indexStatistics.updateDouble(val); - SerializationUtils.writeDouble(stream, val); + utils.writeDouble(stream, val); } } @@ -909,15 +931,15 @@ void recordPosition(PositionRecorder recorder) throws IOException { stringOutput = writer.createStream(id, OrcProto.Stream.Kind.DICTIONARY_DATA); lengthOutput = createIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.LENGTH), false, isDirectV2); + OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); rowOutput = createIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.DATA), false, isDirectV2); + OrcProto.Stream.Kind.DATA), false, isDirectV2, writer); recordPosition(rowIndexPosition); rowIndexValueCount.add(0L); buildIndex = writer.buildIndex(); directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA); directLengthOutput = createIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.LENGTH), false, isDirectV2); + OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); dictionaryKeySizeThreshold = writer.getConfiguration().getFloat( HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname, HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD. @@ -1129,7 +1151,7 @@ String getStringValue(Object obj) { OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); this.length = createIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.LENGTH), false, isDirectV2); + OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); recordPosition(rowIndexPosition); } @@ -1188,9 +1210,9 @@ void recordPosition(PositionRecorder recorder) throws IOException { super(columnId, inspector, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); this.seconds = createIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.DATA), true, isDirectV2); + OrcProto.Stream.Kind.DATA), true, isDirectV2, writer); this.nanos = createIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.SECONDARY), false, isDirectV2); + OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer); recordPosition(rowIndexPosition); } @@ -1261,7 +1283,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { PositionedOutputStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); - this.writer = createIntegerWriter(out, true, isDirectV2); + this.writer = createIntegerWriter(out, true, isDirectV2, writer); recordPosition(rowIndexPosition); } @@ -1314,7 +1336,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { this.isDirectV2 = isNewWriteFormat(writer); valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.scaleStream = createIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.SECONDARY), true, isDirectV2); + OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer); recordPosition(rowIndexPosition); } @@ -1419,7 +1441,7 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, createTreeWriter(listObjectInspector.getListElementObjectInspector(), writer, true); lengths = createIntegerWriter(writer.createStream(columnId, - OrcProto.Stream.Kind.LENGTH), false, isDirectV2); + OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); recordPosition(rowIndexPosition); } @@ -1481,7 +1503,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { childrenWriters[1] = createTreeWriter(insp.getMapValueObjectInspector(), writer, true); lengths = createIntegerWriter(writer.createStream(columnId, - OrcProto.Stream.Kind.LENGTH), false, isDirectV2); + OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); recordPosition(rowIndexPosition); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java index d389525..41a807b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java @@ -20,24 +20,54 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.List; import java.util.Random; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.io.LongWritable; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; +import com.google.common.collect.Lists; import com.google.common.primitives.Longs; public class TestBitPack { private static final int SIZE = 100; private static Random rand = new Random(100); + Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test" + + File.separator + "tmp")); + + Configuration conf; + FileSystem fs; + Path testFilePath; + + @Rule + public TestName testCaseName = new TestName(); + + @Before + public void openFileSystem() throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + testFilePath = new Path(workDir, "TestOrcFile." + testCaseName.getMethodName() + ".orc"); + fs.delete(testFilePath, false); + } private long[] deltaEncode(long[] inp) { long[] output = new long[inp.length]; - for(int i = 0; i < inp.length; i++) { - output[i] = SerializationUtils.zigzagEncode(inp[i]); + SerializationUtils utils = new SerializationUtils(); + for (int i = 0; i < inp.length; i++) { + output[i] = utils.zigzagEncode(inp[i]); } return output; } @@ -53,7 +83,7 @@ private long nextLong(Random rng, long n) { private void runTest(int numBits) throws IOException { long[] inp = new long[SIZE]; - for(int i = 0; i < SIZE; i++) { + for (int i = 0; i < SIZE; i++) { long val = 0; if (numBits <= 32) { if (numBits == 1) { @@ -73,24 +103,20 @@ private void runTest(int numBits) throws IOException { long minInput = Collections.min(Longs.asList(deltaEncoded)); long maxInput = Collections.max(Longs.asList(deltaEncoded)); long rangeInput = maxInput - minInput; - int fixedWidth = SerializationUtils.findClosestNumBits(rangeInput); + SerializationUtils utils = new SerializationUtils(); + int fixedWidth = utils.findClosestNumBits(rangeInput); TestInStream.OutputCollector collect = new TestInStream.OutputCollector(); OutStream output = new OutStream("test", SIZE, null, collect); - SerializationUtils.writeInts(deltaEncoded, 0, deltaEncoded.length, - fixedWidth, output); + utils.writeInts(deltaEncoded, 0, deltaEncoded.length, fixedWidth, output); output.flush(); ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); inBuf.flip(); long[] buff = new long[SIZE]; - SerializationUtils.readInts(buff, 0, SIZE, fixedWidth, - InStream.create("test", - new ByteBuffer[]{inBuf}, - new long[]{0}, - inBuf.remaining(), - null, SIZE)); - for(int i = 0; i < SIZE; i++) { - buff[i] = SerializationUtils.zigzagDecode(buff[i]); + utils.readInts(buff, 0, SIZE, fixedWidth, InStream.create("test", new ByteBuffer[] { inBuf }, + new long[] { 0 }, inBuf.remaining(), null, SIZE)); + for (int i = 0; i < SIZE; i++) { + buff[i] = utils.zigzagDecode(buff[i]); } assertEquals(numBits, fixedWidth); assertArrayEquals(inp, buff); @@ -255,4 +281,36 @@ public void test56BitPacking56Bit() throws IOException { public void test64BitPacking64Bit() throws IOException { runTest(64); } + + @Test + public void testBitPack64Large() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + int size = 1080832; + long[] inp = new long[size]; + Random rand = new Random(1234); + for (int i = 0; i < size; i++) { + inp[i] = rand.nextLong(); + } + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.ZLIB)); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader rows = reader.rows(); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java index 6869320..3285c69 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java @@ -21,12 +21,15 @@ import java.io.File; import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy; import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -36,12 +39,29 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.Lists; import com.google.common.primitives.Longs; +@RunWith(value = Parameterized.class) public class TestNewIntegerEncoding { + private EncodingStrategy encodingStrategy; + + public TestNewIntegerEncoding(EncodingStrategy es) { + this.encodingStrategy = es; + } + + @Parameters + public static Collection data() { + Object[][] data = new Object[][] { { EncodingStrategy.BEST_COMPRESSION }, + { EncodingStrategy.BEST_SPEED } }; + return Arrays.asList(data); + } + public static class TSRow { Timestamp ts; @@ -92,7 +112,8 @@ public void testBasicRow() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); writer.addRow(new Row(111, 1111L)); writer.addRow(new Row(111, 1111L)); writer.addRow(new Row(111, 1111L)); @@ -127,7 +148,8 @@ public void testBasicOld() throws Exception { .inspector(inspector) .compress(CompressionKind.NONE) .version(OrcFile.Version.V_0_11) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -163,7 +185,8 @@ public void testBasicNew() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -195,7 +218,8 @@ public void testBasicDelta1() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -227,7 +251,8 @@ public void testBasicDelta2() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -259,7 +284,8 @@ public void testBasicDelta3() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -291,7 +317,8 @@ public void testBasicDelta4() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -322,7 +349,8 @@ public void testIntegerMin() throws Exception { OrcFile.writerOptions(conf) .inspector(inspector) .stripeSize(100000) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -354,7 +382,8 @@ public void testIntegerMax() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -386,7 +415,8 @@ public void testLongMin() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -418,7 +448,8 @@ public void testLongMax() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -453,7 +484,8 @@ public void testRandomInt() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -488,7 +520,8 @@ public void testRandomLong() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -531,7 +564,8 @@ public void testPatchedBaseNegativeMin() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -574,7 +608,8 @@ public void testPatchedBaseNegativeMin2() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -617,7 +652,8 @@ public void testPatchedBaseNegativeMin3() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -651,7 +687,8 @@ public void testPatchedBaseNegativeMin4() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -687,7 +724,8 @@ public void testPatchedBaseAt0() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -723,7 +761,8 @@ public void testPatchedBaseAt1() throws Exception { .inspector(inspector) .stripeSize(100000) .compress(CompressionKind.NONE) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -758,7 +797,8 @@ public void testPatchedBaseAt255() throws Exception { OrcFile.writerOptions(conf) .inspector(inspector) .stripeSize(100000) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -793,7 +833,8 @@ public void testPatchedBaseAt256() throws Exception { OrcFile.writerOptions(conf) .inspector(inspector) .stripeSize(100000) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -828,7 +869,8 @@ public void testPatchedBase510() throws Exception { OrcFile.writerOptions(conf) .inspector(inspector) .stripeSize(100000) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -863,7 +905,8 @@ public void testPatchedBase511() throws Exception { OrcFile.writerOptions(conf) .inspector(inspector) .stripeSize(100000) - .bufferSize(10000)); + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } @@ -895,7 +938,11 @@ public void testPatchedBaseMax1() throws Exception { input.set(511, Long.MAX_VALUE); Writer writer = OrcFile.createWriter(testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000)); + OrcFile.writerOptions(conf) + .inspector(inspector) + .stripeSize(100000) + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for (Long l : input) { writer.addRow(l); } @@ -929,7 +976,11 @@ public void testPatchedBaseMax2() throws Exception { input.set(511, Long.MAX_VALUE); Writer writer = OrcFile.createWriter(testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000)); + OrcFile.writerOptions(conf) + .inspector(inspector) + .stripeSize(100000) + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for (Long l : input) { writer.addRow(l); } @@ -976,7 +1027,11 @@ public void testPatchedBaseMax3() throws Exception { input.add(33333L); Writer writer = OrcFile.createWriter(testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000)); + OrcFile.writerOptions(conf) + .inspector(inspector) + .stripeSize(100000) + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for (Long l : input) { writer.addRow(l); } @@ -1026,7 +1081,11 @@ public void testPatchedBaseMax4() throws Exception { input.add(Long.MAX_VALUE); Writer writer = OrcFile.createWriter(testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000)); + OrcFile.writerOptions(conf) + .inspector(inspector) + .stripeSize(100000) + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); for (Long l : input) { writer.addRow(l); } @@ -1051,7 +1110,11 @@ public void testPatchedBaseTimestamp() throws Exception { } Writer writer = OrcFile.createWriter(testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000)); + OrcFile.writerOptions(conf) + .inspector(inspector) + .stripeSize(100000) + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); List tslist = Lists.newArrayList(); tslist.add(Timestamp.valueOf("9999-01-01 00:00:00")); @@ -1114,7 +1177,11 @@ public void testDirectLargeNegatives() throws Exception { } Writer writer = OrcFile.createWriter(testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000)); + OrcFile.writerOptions(conf) + .inspector(inspector) + .stripeSize(100000) + .bufferSize(10000) + .encodingStrategy(encodingStrategy)); writer.addRow(-7486502418706614742L); writer.addRow(0L); @@ -1157,7 +1224,8 @@ public void testSeek() throws Exception { .compress(CompressionKind.NONE) .stripeSize(100000) .bufferSize(10000) - .version(OrcFile.Version.V_0_11)); + .version(OrcFile.Version.V_0_11) + .encodingStrategy(encodingStrategy)); for(Long l : input) { writer.addRow(l); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestSerializationUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestSerializationUtils.java index 7ba2036..b689d2b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestSerializationUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestSerializationUtils.java @@ -36,9 +36,9 @@ private InputStream fromBuffer(ByteArrayOutputStream buffer) { @Test public void testDoubles() throws Exception { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - SerializationUtils.writeDouble(buffer, 1343822337.759); - assertEquals(1343822337.759, - SerializationUtils.readDouble(fromBuffer(buffer)), 0.0001); + SerializationUtils utils = new SerializationUtils(); + utils.writeDouble(buffer, 1343822337.759); + assertEquals(1343822337.759, utils.readDouble(fromBuffer(buffer)), 0.0001); } @Test diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestUnrolledBitPack.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestUnrolledBitPack.java new file mode 100644 index 0000000..da60458 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestUnrolledBitPack.java @@ -0,0 +1,95 @@ +package org.apache.hadoop.hive.ql.io.orc; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.io.LongWritable; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; + +@RunWith(value = Parameterized.class) +public class TestUnrolledBitPack { + + private long val; + + public TestUnrolledBitPack(long val) { + this.val = val; + } + + @Parameters + public static Collection data() { + Object[][] data = new Object[][] { { -1 }, { 1 }, { 7 }, { -128 }, { 32000 }, { 8300000 }, + { Integer.MAX_VALUE }, { 540000000000L }, { 140000000000000L }, { 36000000000000000L }, + { Long.MAX_VALUE } }; + return Arrays.asList(data); + } + + Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test" + + File.separator + "tmp")); + + Configuration conf; + FileSystem fs; + Path testFilePath; + + @Rule + public TestName testCaseName = new TestName(); + + @Before + public void openFileSystem() throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + testFilePath = new Path(workDir, "TestOrcFile." + testCaseName.getMethodName() + ".orc"); + fs.delete(testFilePath, false); + } + + @Test + public void testBitPacking() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + long[] inp = new long[] { val, 0, val, val, 0, val, 0, val, val, 0, val, 0, val, val, 0, 0, + val, val, 0, val, 0, 0, val, 0, val, 0, val, 0, 0, val, 0, val, 0, val, 0, 0, val, 0, val, + 0, val, 0, 0, val, 0, val, 0, val, 0, 0, val, 0, val, 0, val, 0, 0, val, 0, val, 0, val, 0, + 0, val, 0, val, 0, val, 0, 0, val, 0, val, 0, val, 0, 0, val, 0, val, 0, val, 0, 0, val, 0, + val, 0, val, 0, 0, val, 0, val, 0, 0, val, val }; + List input = Lists.newArrayList(Longs.asList(inp)); + + Writer writer = OrcFile.createWriter( + testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) + .compress(CompressionKind.NONE).bufferSize(10000)); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader rows = reader.rows(); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + +}