diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index d26573e..7932a3d 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -552,6 +552,9 @@
true),
// Define the default compression codec for ORC file
HIVE_ORC_DEFAULT_COMPRESS("hive.exec.orc.default.compress", "ZLIB"),
+ // Define the default encoding strategy to use
+ HIVE_ORC_ENCODING_STRATEGY("hive.exec.orc.encoding.strategy", "SPEED",
+ new StringsValidator("SPEED", "COMPRESSION")),
HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false),
HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000),
HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10),
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index 8a74e4e..6780688 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -1970,6 +1970,17 @@
+ hive.exec.orc.encoding.strategy
+ SPEED
+
+ Define the encoding strategy to use while writing data. Changing this will
+ only affect the light weight encoding for integers. This flag will not
+ change the compression level of higher level compression codec (like ZLIB).
+ Possible options are SPEED and COMPRESSION.
+
+
+
+
hive.exec.orc.dictionary.key.size.threshold
0.8
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index 7c542c1..7686f82 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -95,6 +95,9 @@ public int getMinor() {
}
}
+ public static enum EncodingStrategy {
+ SPEED, COMPRESSION;
+ }
// Note : these string definitions for table properties are deprecated,
// and retained only for backward compatibility, please do not add to
@@ -117,7 +120,8 @@ public int getMinor() {
STRIPE_SIZE("orc.stripe.size"),
ROW_INDEX_STRIDE("orc.row.index.stride"),
ENABLE_INDEXES("orc.create.index"),
- BLOCK_PADDING("orc.block.padding");
+ BLOCK_PADDING("orc.block.padding"),
+ ENCODING_STRATEGY("orc.encoding.strategy");
private final String propName;
@@ -221,6 +225,7 @@ public static Reader createReader(Path path,
private MemoryManager memoryManagerValue;
private Version versionValue;
private WriterCallback callback;
+ private EncodingStrategy encodingStrategy;
WriterOptions(Configuration conf) {
configuration = conf;
@@ -250,6 +255,13 @@ public static Reader createReader(Path path,
} else {
versionValue = Version.byName(versionName);
}
+ String enString =
+ conf.get(HiveConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname);
+ if (enString == null) {
+ encodingStrategy = EncodingStrategy.SPEED;
+ } else {
+ encodingStrategy = EncodingStrategy.valueOf(enString);
+ }
}
/**
@@ -301,6 +313,14 @@ public WriterOptions blockPadding(boolean value) {
}
/**
+ * Sets the encoding strategy that is used to encode the data.
+ */
+ public WriterOptions encodingStrategy(EncodingStrategy strategy) {
+ encodingStrategy = strategy;
+ return this;
+ }
+
+ /**
* Sets the generic compression that is used to compress the data.
*/
public WriterOptions compress(CompressionKind value) {
@@ -370,7 +390,7 @@ public static Writer createWriter(Path path,
opts.stripeSizeValue, opts.compressValue,
opts.bufferSizeValue, opts.rowIndexStrideValue,
opts.memoryManagerValue, opts.blockPaddingValue,
- opts.versionValue, opts.callback);
+ opts.versionValue, opts.callback, opts.encodingStrategy);
}
/**
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index 578d923..00e0807 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -164,6 +165,11 @@ private String getSettingFromPropsFallingBackToConf(String key, Properties props
options.blockPadding(Boolean.parseBoolean(propVal));
}
+ if ((propVal = getSettingFromPropsFallingBackToConf(
+ OrcFile.OrcTableProperties.ENCODING_STRATEGY.getPropName(),props,conf)) != null){
+ options.encodingStrategy(EncodingStrategy.valueOf(propVal));
+ }
+
return options;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index e033232..4ec07cb 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -23,7 +23,6 @@
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
-import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
@@ -746,9 +745,11 @@ void skipRows(long items) throws IOException {
private static class FloatTreeReader extends TreeReader{
private InStream stream;
+ private final SerializationUtils utils;
FloatTreeReader(Path path, int columnId, Configuration conf) {
super(path, columnId, conf);
+ this.utils = new SerializationUtils();
}
@Override
@@ -777,7 +778,7 @@ Object next(Object previous) throws IOException {
} else {
result = (FloatWritable) previous;
}
- result.set(SerializationUtils.readFloat(stream));
+ result.set(utils.readFloat(stream));
}
return result;
}
@@ -797,7 +798,7 @@ Object nextVector(Object previousVector, long batchSize) throws IOException {
// Read value entries based on isNull entries
for (int i = 0; i < batchSize; i++) {
if (!result.isNull[i]) {
- result.vector[i] = SerializationUtils.readFloat(stream);
+ result.vector[i] = utils.readFloat(stream);
} else {
// If the value is not present then set NaN
@@ -819,16 +820,18 @@ Object nextVector(Object previousVector, long batchSize) throws IOException {
void skipRows(long items) throws IOException {
items = countNonNulls(items);
for(int i=0; i < items; ++i) {
- SerializationUtils.readFloat(stream);
+ utils.readFloat(stream);
}
}
}
private static class DoubleTreeReader extends TreeReader{
private InStream stream;
+ private final SerializationUtils utils;
DoubleTreeReader(Path path, int columnId, Configuration conf) {
super(path, columnId, conf);
+ this.utils = new SerializationUtils();
}
@Override
@@ -858,7 +861,7 @@ Object next(Object previous) throws IOException {
} else {
result = (DoubleWritable) previous;
}
- result.set(SerializationUtils.readDouble(stream));
+ result.set(utils.readDouble(stream));
}
return result;
}
@@ -878,7 +881,7 @@ Object nextVector(Object previousVector, long batchSize) throws IOException {
// Read value entries based on isNull entries
for (int i = 0; i < batchSize; i++) {
if (!result.isNull[i]) {
- result.vector[i] = SerializationUtils.readDouble(stream);
+ result.vector[i] = utils.readDouble(stream);
} else {
// If the value is not present then set NaN
result.vector[i] = Double.NaN;
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
index 1756c3c..9642971 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
@@ -34,10 +34,12 @@
private int delta = 0;
private int used = 0;
private boolean repeat = false;
+ private SerializationUtils utils;
RunLengthIntegerReader(InStream input, boolean signed) throws IOException {
this.input = input;
this.signed = signed;
+ this.utils = new SerializationUtils();
}
private void readValues() throws IOException {
@@ -55,9 +57,9 @@ private void readValues() throws IOException {
// convert from 0 to 255 to -128 to 127 by converting to a signed byte
delta = (byte) (0 + delta);
if (signed) {
- literals[0] = SerializationUtils.readVslong(input);
+ literals[0] = utils.readVslong(input);
} else {
- literals[0] = SerializationUtils.readVulong(input);
+ literals[0] = utils.readVulong(input);
}
} else {
repeat = false;
@@ -65,9 +67,9 @@ private void readValues() throws IOException {
used = 0;
for(int i=0; i < numLiterals; ++i) {
if (signed) {
- literals[i] = SerializationUtils.readVslong(input);
+ literals[i] = utils.readVslong(input);
} else {
- literals[i] = SerializationUtils.readVulong(input);
+ literals[i] = utils.readVulong(input);
}
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
index a7e66a8..4057036 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
@@ -39,12 +39,14 @@
private int numLiterals = 0;
private int used = 0;
private final boolean skipCorrupt;
+ private final SerializationUtils utils;
RunLengthIntegerReaderV2(InStream input, boolean signed,
Configuration conf) throws IOException {
this.input = input;
this.signed = signed;
this.skipCorrupt = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
+ this.utils = new SerializationUtils();
}
private void readValues() throws IOException {
@@ -71,7 +73,7 @@ private void readDeltaValues(int firstByte) throws IOException {
// extract the number of fixed bits
int fb = (firstByte >>> 1) & 0x1f;
if (fb != 0) {
- fb = SerializationUtils.decodeBitWidth(fb);
+ fb = utils.decodeBitWidth(fb);
}
// extract the blob run length
@@ -81,9 +83,9 @@ private void readDeltaValues(int firstByte) throws IOException {
// read the first value stored as vint
long firstVal = 0;
if (signed) {
- firstVal = SerializationUtils.readVslong(input);
+ firstVal = utils.readVslong(input);
} else {
- firstVal = SerializationUtils.readVulong(input);
+ firstVal = utils.readVulong(input);
}
// store first value to result buffer
@@ -94,14 +96,14 @@ private void readDeltaValues(int firstByte) throws IOException {
if (fb == 0) {
// read the fixed delta value stored as vint (deltas can be negative even
// if all number are positive)
- long fd = SerializationUtils.readVslong(input);
+ long fd = utils.readVslong(input);
// add fixed deltas to adjacent values
for(int i = 0; i < len; i++) {
literals[numLiterals++] = literals[numLiterals - 2] + fd;
}
} else {
- long deltaBase = SerializationUtils.readVslong(input);
+ long deltaBase = utils.readVslong(input);
// add delta base and first value
literals[numLiterals++] = firstVal + deltaBase;
prevVal = literals[numLiterals - 1];
@@ -110,7 +112,7 @@ private void readDeltaValues(int firstByte) throws IOException {
// write the unpacked values, add it to previous value and store final
// value to result buffer. if the delta base value is negative then it
// is a decreasing sequence else an increasing sequence
- SerializationUtils.readInts(literals, numLiterals, len, fb, input);
+ utils.readInts(literals, numLiterals, len, fb, input);
while (len > 0) {
if (deltaBase < 0) {
literals[numLiterals] = prevVal - literals[numLiterals];
@@ -128,7 +130,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException {
// extract the number of fixed bits
int fbo = (firstByte >>> 1) & 0x1f;
- int fb = SerializationUtils.decodeBitWidth(fbo);
+ int fb = utils.decodeBitWidth(fbo);
// extract the run length of data blob
int len = (firstByte & 0x01) << 8;
@@ -144,7 +146,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException {
// extract patch width
int pwo = thirdByte & 0x1f;
- int pw = SerializationUtils.decodeBitWidth(pwo);
+ int pw = utils.decodeBitWidth(pwo);
// read fourth byte and extract patch gap width
int fourthByte = input.read();
@@ -156,7 +158,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException {
int pl = fourthByte & 0x1f;
// read the next base width number of bytes to extract base value
- long base = SerializationUtils.bytesToLongBE(input, bw);
+ long base = utils.bytesToLongBE(input, bw);
long mask = (1L << ((bw * 8) - 1));
// if MSB of base value is 1 then base is negative value else positive
if ((base & mask) != 0) {
@@ -166,7 +168,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException {
// unpack the data blob
long[] unpacked = new long[len];
- SerializationUtils.readInts(unpacked, 0, len, fb, input);
+ utils.readInts(unpacked, 0, len, fb, input);
// unpack the patch blob
long[] unpackedPatch = new long[pl];
@@ -174,8 +176,8 @@ private void readPatchedBaseValues(int firstByte) throws IOException {
if ((pw + pgw) > 64 && !skipCorrupt) {
throw new IOException(ErrorMsg.ORC_CORRUPTED_READ.getMsg());
}
- int bitSize = SerializationUtils.getClosestFixedBits(pw + pgw);
- SerializationUtils.readInts(unpackedPatch, 0, pl, bitSize, input);
+ int bitSize = utils.getClosestFixedBits(pw + pgw);
+ utils.readInts(unpackedPatch, 0, pl, bitSize, input);
// apply the patch directly when decoding the packed data
int patchIdx = 0;
@@ -241,7 +243,7 @@ private void readDirectValues(int firstByte) throws IOException {
// extract the number of fixed bits
int fbo = (firstByte >>> 1) & 0x1f;
- int fb = SerializationUtils.decodeBitWidth(fbo);
+ int fb = utils.decodeBitWidth(fbo);
// extract the run length
int len = (firstByte & 0x01) << 8;
@@ -250,11 +252,10 @@ private void readDirectValues(int firstByte) throws IOException {
len += 1;
// write the unpacked values and zigzag decode to result buffer
- SerializationUtils.readInts(literals, numLiterals, len, fb, input);
+ utils.readInts(literals, numLiterals, len, fb, input);
if (signed) {
for(int i = 0; i < len; i++) {
- literals[numLiterals] = SerializationUtils
- .zigzagDecode(literals[numLiterals]);
+ literals[numLiterals] = utils.zigzagDecode(literals[numLiterals]);
numLiterals++;
}
} else {
@@ -275,10 +276,10 @@ private void readShortRepeatValues(int firstByte) throws IOException {
len += RunLengthIntegerWriterV2.MIN_REPEAT;
// read the repeated value which is store using fixed bytes
- long val = SerializationUtils.bytesToLongBE(input, size);
+ long val = utils.bytesToLongBE(input, size);
if (signed) {
- val = SerializationUtils.zigzagDecode(val);
+ val = utils.zigzagDecode(val);
}
// repeat the value for length times
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
index 539f8df..078eae8 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
@@ -38,11 +38,13 @@
private long delta = 0;
private boolean repeat = false;
private int tailRunLength = 0;
+ private SerializationUtils utils;
RunLengthIntegerWriter(PositionedOutputStream output,
boolean signed) {
this.output = output;
this.signed = signed;
+ this.utils = new SerializationUtils();
}
private void writeValues() throws IOException {
@@ -51,17 +53,17 @@ private void writeValues() throws IOException {
output.write(numLiterals - MIN_REPEAT_SIZE);
output.write((byte) delta);
if (signed) {
- SerializationUtils.writeVslong(output, literals[0]);
+ utils.writeVslong(output, literals[0]);
} else {
- SerializationUtils.writeVulong(output, literals[0]);
+ utils.writeVulong(output, literals[0]);
}
} else {
output.write(-numLiterals);
for(int i=0; i < numLiterals; ++i) {
if (signed) {
- SerializationUtils.writeVslong(output, literals[i]);
+ utils.writeVslong(output, literals[i]);
} else {
- SerializationUtils.writeVulong(output, literals[i]);
+ utils.writeVulong(output, literals[i]);
}
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
index 171ff89..3a0ba1d 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
@@ -157,10 +157,19 @@
private long[] gapVsPatchList;
private long min;
private boolean isFixedDelta;
+ private SerializationUtils utils;
+ private boolean alignedBitpacking;
RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) {
+ this(output, signed, true);
+ }
+
+ RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed,
+ boolean alignedBitpacking) {
this.output = output;
this.signed = signed;
+ this.alignedBitpacking = alignedBitpacking;
+ this.utils = new SerializationUtils();
clear();
}
@@ -187,6 +196,10 @@ private void writeDeltaValues() throws IOException {
int fb = bitsDeltaMax;
int efb = 0;
+ if (alignedBitpacking) {
+ fb = utils.getClosestAlignedFixedBits(fb);
+ }
+
if (isFixedDelta) {
// if fixed run length is greater than threshold then it will be fixed
// delta sequence with delta value 0 else fixed delta sequence with
@@ -206,20 +219,20 @@ private void writeDeltaValues() throws IOException {
if (fb == 1) {
fb = 2;
}
- efb = SerializationUtils.encodeBitWidth(fb);
+ efb = utils.encodeBitWidth(fb);
efb = efb << 1;
len = variableRunLength - 1;
variableRunLength = 0;
}
// extract the 9th bit of run length
- int tailBits = (len & 0x100) >>> 8;
+ final int tailBits = (len & 0x100) >>> 8;
// create first byte of the header
- int headerFirstByte = getOpcode() | efb | tailBits;
+ final int headerFirstByte = getOpcode() | efb | tailBits;
// second byte of the header stores the remaining 8 bits of runlength
- int headerSecondByte = len & 0xff;
+ final int headerSecondByte = len & 0xff;
// write header
output.write(headerFirstByte);
@@ -227,43 +240,50 @@ private void writeDeltaValues() throws IOException {
// store the first value from zigzag literal array
if (signed) {
- SerializationUtils.writeVslong(output, literals[0]);
+ utils.writeVslong(output, literals[0]);
} else {
- SerializationUtils.writeVulong(output, literals[0]);
+ utils.writeVulong(output, literals[0]);
}
if (isFixedDelta) {
// if delta is fixed then we don't need to store delta blob
- SerializationUtils.writeVslong(output, fixedDelta);
+ utils.writeVslong(output, fixedDelta);
} else {
// store the first value as delta value using zigzag encoding
- SerializationUtils.writeVslong(output, adjDeltas[0]);
+ utils.writeVslong(output, adjDeltas[0]);
+
// adjacent delta values are bit packed
- SerializationUtils.writeInts(adjDeltas, 1, adjDeltas.length - 1, fb,
- output);
+ utils.writeInts(adjDeltas, 1, adjDeltas.length - 1, fb, output);
}
}
private void writePatchedBaseValues() throws IOException {
+ // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding
+ // because patch is applied to MSB bits. For example: If fixed bit width of
+ // base value is 7 bits and if patch is 3 bits, the actual value is
+ // constructed by shifting the patch to left by 7 positions.
+ // actual_value = patch << 7 | base_value
+ // So, if we align base_value then actual_value can not be reconstructed.
+
// write the number of fixed bits required in next 5 bits
- int fb = brBits95p;
- int efb = SerializationUtils.encodeBitWidth(fb) << 1;
+ final int fb = brBits95p;
+ final int efb = utils.encodeBitWidth(fb) << 1;
// adjust variable run length, they are one off
variableRunLength -= 1;
// extract the 9th bit of run length
- int tailBits = (variableRunLength & 0x100) >>> 8;
+ final int tailBits = (variableRunLength & 0x100) >>> 8;
// create first byte of the header
- int headerFirstByte = getOpcode() | efb | tailBits;
+ final int headerFirstByte = getOpcode() | efb | tailBits;
// second byte of the header stores the remaining 8 bits of runlength
- int headerSecondByte = variableRunLength & 0xff;
+ final int headerSecondByte = variableRunLength & 0xff;
// if the min value is negative toggle the sign
- boolean isNegative = min < 0 ? true : false;
+ final boolean isNegative = min < 0 ? true : false;
if (isNegative) {
min = -min;
}
@@ -271,9 +291,9 @@ private void writePatchedBaseValues() throws IOException {
// find the number of bytes required for base and shift it by 5 bits
// to accommodate patch width. The additional bit is used to store the sign
// of the base value.
- int baseWidth = SerializationUtils.findClosestNumBits(min) + 1;
- int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1;
- int bb = (baseBytes - 1) << 5;
+ final int baseWidth = utils.findClosestNumBits(min) + 1;
+ final int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1;
+ final int bb = (baseBytes - 1) << 5;
// if the base value is negative then set MSB to 1
if (isNegative) {
@@ -282,11 +302,11 @@ private void writePatchedBaseValues() throws IOException {
// third byte contains 3 bits for number of bytes occupied by base
// and 5 bits for patchWidth
- int headerThirdByte = bb | SerializationUtils.encodeBitWidth(patchWidth);
+ final int headerThirdByte = bb | utils.encodeBitWidth(patchWidth);
// fourth byte contains 3 bits for page gap width and 5 bits for
// patch length
- int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
+ final int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
// write header
output.write(headerFirstByte);
@@ -301,15 +321,16 @@ private void writePatchedBaseValues() throws IOException {
}
// base reduced literals are bit packed
- int closestFixedBits = SerializationUtils.getClosestFixedBits(brBits95p);
- SerializationUtils.writeInts(baseRedLiterals, 0, baseRedLiterals.length,
- closestFixedBits, output);
+ int closestFixedBits = utils.getClosestFixedBits(fb);
+
+ utils.writeInts(baseRedLiterals, 0, baseRedLiterals.length, closestFixedBits,
+ output);
// write patch list
- closestFixedBits = SerializationUtils.getClosestFixedBits(patchGapWidth
- + patchWidth);
- SerializationUtils.writeInts(gapVsPatchList, 0, gapVsPatchList.length,
- closestFixedBits, output);
+ closestFixedBits = utils.getClosestFixedBits(patchGapWidth + patchWidth);
+
+ utils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits,
+ output);
// reset run length
variableRunLength = 0;
@@ -326,27 +347,32 @@ private int getOpcode() {
private void writeDirectValues() throws IOException {
// write the number of fixed bits required in next 5 bits
- int efb = SerializationUtils.encodeBitWidth(zzBits100p) << 1;
+ int fb = zzBits100p;
+
+ if (alignedBitpacking) {
+ fb = utils.getClosestAlignedFixedBits(fb);
+ }
+
+ final int efb = utils.encodeBitWidth(fb) << 1;
// adjust variable run length
variableRunLength -= 1;
// extract the 9th bit of run length
- int tailBits = (variableRunLength & 0x100) >>> 8;
+ final int tailBits = (variableRunLength & 0x100) >>> 8;
// create first byte of the header
- int headerFirstByte = getOpcode() | efb | tailBits;
+ final int headerFirstByte = getOpcode() | efb | tailBits;
// second byte of the header stores the remaining 8 bits of runlength
- int headerSecondByte = variableRunLength & 0xff;
+ final int headerSecondByte = variableRunLength & 0xff;
// write header
output.write(headerFirstByte);
output.write(headerSecondByte);
// bit packing the zigzag encoded literals
- SerializationUtils.writeInts(zigzagLiterals, 0, zigzagLiterals.length,
- zzBits100p, output);
+ utils.writeInts(zigzagLiterals, 0, zigzagLiterals.length, fb, output);
// reset run length
variableRunLength = 0;
@@ -356,13 +382,13 @@ private void writeShortRepeatValues() throws IOException {
// get the value that is repeating, compute the bits and bytes required
long repeatVal = 0;
if (signed) {
- repeatVal = SerializationUtils.zigzagEncode(literals[0]);
+ repeatVal = utils.zigzagEncode(literals[0]);
} else {
repeatVal = literals[0];
}
- int numBitsRepeatVal = SerializationUtils.findClosestNumBits(repeatVal);
- int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3
+ final int numBitsRepeatVal = utils.findClosestNumBits(repeatVal);
+ final int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3
: (numBitsRepeatVal >>> 3) + 1;
// write encoding type in top 2 bits
@@ -440,7 +466,7 @@ private void determineEncoding() {
// populate zigzag encoded literals
long zzEncVal = 0;
if (signed) {
- zzEncVal = SerializationUtils.zigzagEncode(literals[i]);
+ zzEncVal = utils.zigzagEncode(literals[i]);
} else {
zzEncVal = literals[i];
}
@@ -464,7 +490,7 @@ private void determineEncoding() {
// stores the number of bits required for packing delta blob in
// delta encoding
- bitsDeltaMax = SerializationUtils.findClosestNumBits(deltaMax);
+ bitsDeltaMax = utils.findClosestNumBits(deltaMax);
// if decreasing count equals total number of literals then the
// sequence is monotonically decreasing
@@ -504,10 +530,10 @@ private void determineEncoding() {
// is not significant then we can use direct or delta encoding
double p = 0.9;
- zzBits90p = SerializationUtils.percentileBits(zigzagLiterals, p);
+ zzBits90p = utils.percentileBits(zigzagLiterals, p);
p = 1.0;
- zzBits100p = SerializationUtils.percentileBits(zigzagLiterals, p);
+ zzBits100p = utils.percentileBits(zigzagLiterals, p);
int diffBitsLH = zzBits100p - zzBits90p;
@@ -524,11 +550,11 @@ private void determineEncoding() {
// 95th percentile width is used to determine max allowed value
// after which patching will be done
p = 0.95;
- brBits95p = SerializationUtils.percentileBits(baseRedLiterals, p);
+ brBits95p = utils.percentileBits(baseRedLiterals, p);
// 100th percentile is used to compute the max patch width
p = 1.0;
- brBits100p = SerializationUtils.percentileBits(baseRedLiterals, p);
+ brBits100p = utils.percentileBits(baseRedLiterals, p);
// after base reducing the values, if the difference in bits between
// 95th percentile and 100th percentile value is zero then there
@@ -573,7 +599,7 @@ private void preparePatchedBlob() {
// #bit for patch
patchWidth = brBits100p - brBits95p;
- patchWidth = SerializationUtils.getClosestFixedBits(patchWidth);
+ patchWidth = utils.getClosestFixedBits(patchWidth);
// if patch bit requirement is 64 then it will not possible to pack
// gap and patch together in a long. To make sure gap and patch can be
@@ -619,7 +645,7 @@ private void preparePatchedBlob() {
if (maxGap == 0 && patchLength != 0) {
patchGapWidth = 1;
} else {
- patchGapWidth = SerializationUtils.findClosestNumBits(maxGap);
+ patchGapWidth = utils.findClosestNumBits(maxGap);
}
// special case: if the patch gap width is greater than 256, then
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
index 20c8c6d..b8af7f0 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
@@ -26,10 +26,16 @@
final class SerializationUtils {
- // unused
- private SerializationUtils() {}
+ private final static int BUFFER_SIZE = 64;
+ private final byte[] readBuffer;
+ private final byte[] writeBuffer;
- static void writeVulong(OutputStream output, long value) throws IOException {
+ public SerializationUtils() {
+ this.readBuffer = new byte[BUFFER_SIZE];
+ this.writeBuffer = new byte[BUFFER_SIZE];
+ }
+
+ void writeVulong(OutputStream output, long value) throws IOException {
while (true) {
if ((value & ~0x7f) == 0) {
output.write((byte) value);
@@ -41,12 +47,12 @@ static void writeVulong(OutputStream output, long value) throws IOException {
}
}
- static void writeVslong(OutputStream output, long value) throws IOException {
+ void writeVslong(OutputStream output, long value) throws IOException {
writeVulong(output, (value << 1) ^ (value >> 63));
}
- static long readVulong(InputStream in) throws IOException {
+ long readVulong(InputStream in) throws IOException {
long result = 0;
long b;
int offset = 0;
@@ -61,18 +67,18 @@ static long readVulong(InputStream in) throws IOException {
return result;
}
- static long readVslong(InputStream in) throws IOException {
+ long readVslong(InputStream in) throws IOException {
long result = readVulong(in);
return (result >>> 1) ^ -(result & 1);
}
- static float readFloat(InputStream in) throws IOException {
+ float readFloat(InputStream in) throws IOException {
int ser = in.read() | (in.read() << 8) | (in.read() << 16) |
(in.read() << 24);
return Float.intBitsToFloat(ser);
}
- static void writeFloat(OutputStream output, float value) throws IOException {
+ void writeFloat(OutputStream output, float value) throws IOException {
int ser = Float.floatToIntBits(value);
output.write(ser & 0xff);
output.write((ser >> 8) & 0xff);
@@ -80,29 +86,36 @@ static void writeFloat(OutputStream output, float value) throws IOException {
output.write((ser >> 24) & 0xff);
}
- static double readDouble(InputStream in) throws IOException {
- long ser = (long) in.read() |
- ((long) in.read() << 8) |
- ((long) in.read() << 16) |
- ((long) in.read() << 24) |
- ((long) in.read() << 32) |
- ((long) in.read() << 40) |
- ((long) in.read() << 48) |
- ((long) in.read() << 56);
- return Double.longBitsToDouble(ser);
+ double readDouble(InputStream in) throws IOException {
+ return Double.longBitsToDouble(readLongLE(in));
+ }
+
+ long readLongLE(InputStream in) throws IOException {
+ in.read(readBuffer, 0, 8);
+ return ((readBuffer[0] << 0)
+ + ((readBuffer[1] & 255) << 8)
+ + ((readBuffer[2] & 255) << 16)
+ + ((long) (readBuffer[3] & 255) << 24)
+ + ((long) (readBuffer[4] & 255) << 32)
+ + ((long) (readBuffer[5] & 255) << 40)
+ + ((long) (readBuffer[6] & 255) << 48)
+ + ((long) (readBuffer[7] & 255) << 56));
}
- static void writeDouble(OutputStream output,
- double value) throws IOException {
- long ser = Double.doubleToLongBits(value);
- output.write(((int) ser) & 0xff);
- output.write(((int) (ser >> 8)) & 0xff);
- output.write(((int) (ser >> 16)) & 0xff);
- output.write(((int) (ser >> 24)) & 0xff);
- output.write(((int) (ser >> 32)) & 0xff);
- output.write(((int) (ser >> 40)) & 0xff);
- output.write(((int) (ser >> 48)) & 0xff);
- output.write(((int) (ser >> 56)) & 0xff);
+ void writeDouble(OutputStream output, double value) throws IOException {
+ writeLongLE(output, Double.doubleToLongBits(value));
+ }
+
+ private void writeLongLE(OutputStream output, long value) throws IOException {
+ writeBuffer[0] = (byte) (value >>> 0);
+ writeBuffer[1] = (byte) (value >>> 8);
+ writeBuffer[2] = (byte) (value >>> 16);
+ writeBuffer[3] = (byte) (value >>> 24);
+ writeBuffer[4] = (byte) (value >>> 32);
+ writeBuffer[5] = (byte) (value >>> 40);
+ writeBuffer[6] = (byte) (value >>> 48);
+ writeBuffer[7] = (byte) (value >>> 56);
+ output.write(writeBuffer, 0, 8);
}
/**
@@ -198,7 +211,7 @@ static BigInteger readBigInteger(InputStream input) throws IOException {
* @param value
* @return bits required to store value
*/
- static int findClosestNumBits(long value) {
+ int findClosestNumBits(long value) {
int count = 0;
while (value != 0) {
count++;
@@ -212,7 +225,7 @@ static int findClosestNumBits(long value) {
* @param val
* @return zigzag encoded value
*/
- static long zigzagEncode(long val) {
+ long zigzagEncode(long val) {
return (val << 1) ^ (val >> 63);
}
@@ -221,7 +234,7 @@ static long zigzagEncode(long val) {
* @param val
* @return zizag decoded value
*/
- static long zigzagDecode(long val) {
+ long zigzagDecode(long val) {
return (val >>> 1) ^ -(val & 1);
}
@@ -231,7 +244,7 @@ static long zigzagDecode(long val) {
* @param p - percentile value (>=0.0 to <=1.0)
* @return pth percentile bits
*/
- static int percentileBits(long[] data, double p) {
+ int percentileBits(long[] data, double p) {
if ((p > 1.0) || (p <= 0.0)) {
return -1;
}
@@ -265,7 +278,7 @@ static int percentileBits(long[] data, double p) {
* @param b - byte array
* @return long value
*/
- static long bytesToLongBE(InStream input, int n) throws IOException {
+ long bytesToLongBE(InStream input, int n) throws IOException {
long out = 0;
long val = 0;
while (n > 0) {
@@ -283,7 +296,7 @@ static long bytesToLongBE(InStream input, int n) throws IOException {
* @param numBits - bit width
* @return number of bytes required
*/
- static int getTotalBytesRequired(int n, int numBits) {
+ int getTotalBytesRequired(int n, int numBits) {
return (n * numBits + 7) / 8;
}
@@ -293,7 +306,7 @@ static int getTotalBytesRequired(int n, int numBits) {
* @param n
* @return closest valid fixed bit
*/
- static int getClosestFixedBits(int n) {
+ int getClosestFixedBits(int n) {
if (n == 0) {
return 1;
}
@@ -319,13 +332,39 @@ static int getClosestFixedBits(int n) {
}
}
+ public int getClosestAlignedFixedBits(int n) {
+ if (n == 0 || n == 1) {
+ return 1;
+ } else if (n > 1 && n <= 2) {
+ return 2;
+ } else if (n > 2 && n <= 4) {
+ return 4;
+ } else if (n > 4 && n <= 8) {
+ return 8;
+ } else if (n > 8 && n <= 16) {
+ return 16;
+ } else if (n > 16 && n <= 24) {
+ return 24;
+ } else if (n > 24 && n <= 32) {
+ return 32;
+ } else if (n > 32 && n <= 40) {
+ return 40;
+ } else if (n > 40 && n <= 48) {
+ return 48;
+ } else if (n > 48 && n <= 56) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
/**
* Finds the closest available fixed bit width match and returns its encoded
* value (ordinal)
* @param n - fixed bit width to encode
* @return encoded fixed bit width
*/
- static int encodeBitWidth(int n) {
+ int encodeBitWidth(int n) {
n = getClosestFixedBits(n);
if (n >= 1 && n <= 24) {
@@ -354,7 +393,7 @@ static int encodeBitWidth(int n) {
* @param n - encoded fixed bit width
* @return decoded fixed bit width
*/
- static int decodeBitWidth(int n) {
+ int decodeBitWidth(int n) {
if (n >= FixedBitSizes.ONE.ordinal()
&& n <= FixedBitSizes.TWENTYFOUR.ordinal()) {
return n + 1;
@@ -386,13 +425,51 @@ static int decodeBitWidth(int n) {
* @param output - output stream
* @throws IOException
*/
- static void writeInts(long[] input, int offset, int len, int bitSize,
+ void writeInts(long[] input, int offset, int len, int bitSize,
OutputStream output) throws IOException {
if (input == null || input.length < 1 || offset < 0 || len < 1
|| bitSize < 1) {
return;
}
+ switch (bitSize) {
+ case 1:
+ unrolledBitPack1(input, offset, len, output);
+ return;
+ case 2:
+ unrolledBitPack2(input, offset, len, output);
+ return;
+ case 4:
+ unrolledBitPack4(input, offset, len, output);
+ return;
+ case 8:
+ unrolledBitPack8(input, offset, len, output);
+ return;
+ case 16:
+ unrolledBitPack16(input, offset, len, output);
+ return;
+ case 24:
+ unrolledBitPack24(input, offset, len, output);
+ return;
+ case 32:
+ unrolledBitPack32(input, offset, len, output);
+ return;
+ case 40:
+ unrolledBitPack40(input, offset, len, output);
+ return;
+ case 48:
+ unrolledBitPack48(input, offset, len, output);
+ return;
+ case 56:
+ unrolledBitPack56(input, offset, len, output);
+ return;
+ case 64:
+ unrolledBitPack64(input, offset, len, output);
+ return;
+ default:
+ break;
+ }
+
int bitsLeft = 8;
byte current = 0;
for(int i = offset; i < (offset + len); i++) {
@@ -426,6 +503,357 @@ static void writeInts(long[] input, int offset, int len, int bitSize,
}
}
+ private void unrolledBitPack1(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ final int numHops = 8;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int val = 0;
+ for (int i = offset; i < endUnroll; i = i + numHops) {
+ val = (int) (val | ((input[i] & 1) << 7)
+ | ((input[i + 1] & 1) << 6)
+ | ((input[i + 2] & 1) << 5)
+ | ((input[i + 3] & 1) << 4)
+ | ((input[i + 4] & 1) << 3)
+ | ((input[i + 5] & 1) << 2)
+ | ((input[i + 6] & 1) << 1)
+ | (input[i + 7]) & 1);
+ output.write(val);
+ val = 0;
+ }
+
+ if (remainder > 0) {
+ int startShift = 7;
+ for (int i = endUnroll; i < endOffset; i++) {
+ val = (int) (val | (input[i] & 1) << startShift);
+ startShift -= 1;
+ }
+ output.write(val);
+ }
+ }
+
+ private void unrolledBitPack2(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ final int numHops = 4;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int val = 0;
+ for (int i = offset; i < endUnroll; i = i + numHops) {
+ val = (int) (val | ((input[i] & 3) << 6)
+ | ((input[i + 1] & 3) << 4)
+ | ((input[i + 2] & 3) << 2)
+ | (input[i + 3]) & 3);
+ output.write(val);
+ val = 0;
+ }
+
+ if (remainder > 0) {
+ int startShift = 6;
+ for (int i = endUnroll; i < endOffset; i++) {
+ val = (int) (val | (input[i] & 3) << startShift);
+ startShift -= 2;
+ }
+ output.write(val);
+ }
+ }
+
+ private void unrolledBitPack4(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ final int numHops = 2;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int val = 0;
+ for (int i = offset; i < endUnroll; i = i + numHops) {
+ val = (int) (val | ((input[i] & 15) << 4) | (input[i + 1]) & 15);
+ output.write(val);
+ val = 0;
+ }
+
+ if (remainder > 0) {
+ int startShift = 4;
+ for (int i = endUnroll; i < endOffset; i++) {
+ val = (int) (val | (input[i] & 15) << startShift);
+ startShift -= 4;
+ }
+ output.write(val);
+ }
+ }
+
+ private void unrolledBitPack8(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 1);
+ }
+
+ private void unrolledBitPack16(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 2);
+ }
+
+ private void unrolledBitPack24(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 3);
+ }
+
+ private void unrolledBitPack32(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 4);
+ }
+
+ private void unrolledBitPack40(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 5);
+ }
+
+ private void unrolledBitPack48(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 6);
+ }
+
+ private void unrolledBitPack56(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 7);
+ }
+
+ private void unrolledBitPack64(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 8);
+ }
+
+ private void unrolledBitPackBytes(long[] input, int offset, int len, OutputStream output, int numBytes) throws IOException {
+ final int numHops = 8;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int i = offset;
+ for (; i < endUnroll; i = i + numHops) {
+ writeLongBE(output, input, i, numHops, numBytes);
+ }
+
+ if (remainder > 0) {
+ writeRemainingLongs(output, i, input, remainder, numBytes);
+ }
+ }
+
+ private void writeRemainingLongs(OutputStream output, int offset, long[] input, int remainder,
+ int numBytes) throws IOException {
+ final int numHops = remainder;
+
+ int idx = 0;
+ switch (numBytes) {
+ case 1:
+ while (remainder > 0) {
+ writeBuffer[idx] = (byte) (input[offset + idx] & 255);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 2:
+ while (remainder > 0) {
+ writeLongBE2(output, input[offset + idx], idx * 2);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 3:
+ while (remainder > 0) {
+ writeLongBE3(output, input[offset + idx], idx * 3);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 4:
+ while (remainder > 0) {
+ writeLongBE4(output, input[offset + idx], idx * 4);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 5:
+ while (remainder > 0) {
+ writeLongBE5(output, input[offset + idx], idx * 5);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 6:
+ while (remainder > 0) {
+ writeLongBE6(output, input[offset + idx], idx * 6);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 7:
+ while (remainder > 0) {
+ writeLongBE7(output, input[offset + idx], idx * 7);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 8:
+ while (remainder > 0) {
+ writeLongBE8(output, input[offset + idx], idx * 8);
+ remainder--;
+ idx++;
+ }
+ break;
+ default:
+ break;
+ }
+
+ final int toWrite = numHops * numBytes;
+ output.write(writeBuffer, 0, toWrite);
+ }
+
+ private void writeLongBE(OutputStream output, long[] input, int offset, int numHops, int numBytes) throws IOException {
+
+ switch (numBytes) {
+ case 1:
+ writeBuffer[0] = (byte) (input[offset + 0] & 255);
+ writeBuffer[1] = (byte) (input[offset + 1] & 255);
+ writeBuffer[2] = (byte) (input[offset + 2] & 255);
+ writeBuffer[3] = (byte) (input[offset + 3] & 255);
+ writeBuffer[4] = (byte) (input[offset + 4] & 255);
+ writeBuffer[5] = (byte) (input[offset + 5] & 255);
+ writeBuffer[6] = (byte) (input[offset + 6] & 255);
+ writeBuffer[7] = (byte) (input[offset + 7] & 255);
+ break;
+ case 2:
+ writeLongBE2(output, input[offset + 0], 0);
+ writeLongBE2(output, input[offset + 1], 2);
+ writeLongBE2(output, input[offset + 2], 4);
+ writeLongBE2(output, input[offset + 3], 6);
+ writeLongBE2(output, input[offset + 4], 8);
+ writeLongBE2(output, input[offset + 5], 10);
+ writeLongBE2(output, input[offset + 6], 12);
+ writeLongBE2(output, input[offset + 7], 14);
+ break;
+ case 3:
+ writeLongBE3(output, input[offset + 0], 0);
+ writeLongBE3(output, input[offset + 1], 3);
+ writeLongBE3(output, input[offset + 2], 6);
+ writeLongBE3(output, input[offset + 3], 9);
+ writeLongBE3(output, input[offset + 4], 12);
+ writeLongBE3(output, input[offset + 5], 15);
+ writeLongBE3(output, input[offset + 6], 18);
+ writeLongBE3(output, input[offset + 7], 21);
+ break;
+ case 4:
+ writeLongBE4(output, input[offset + 0], 0);
+ writeLongBE4(output, input[offset + 1], 4);
+ writeLongBE4(output, input[offset + 2], 8);
+ writeLongBE4(output, input[offset + 3], 12);
+ writeLongBE4(output, input[offset + 4], 16);
+ writeLongBE4(output, input[offset + 5], 20);
+ writeLongBE4(output, input[offset + 6], 24);
+ writeLongBE4(output, input[offset + 7], 28);
+ break;
+ case 5:
+ writeLongBE5(output, input[offset + 0], 0);
+ writeLongBE5(output, input[offset + 1], 5);
+ writeLongBE5(output, input[offset + 2], 10);
+ writeLongBE5(output, input[offset + 3], 15);
+ writeLongBE5(output, input[offset + 4], 20);
+ writeLongBE5(output, input[offset + 5], 25);
+ writeLongBE5(output, input[offset + 6], 30);
+ writeLongBE5(output, input[offset + 7], 35);
+ break;
+ case 6:
+ writeLongBE6(output, input[offset + 0], 0);
+ writeLongBE6(output, input[offset + 1], 6);
+ writeLongBE6(output, input[offset + 2], 12);
+ writeLongBE6(output, input[offset + 3], 18);
+ writeLongBE6(output, input[offset + 4], 24);
+ writeLongBE6(output, input[offset + 5], 30);
+ writeLongBE6(output, input[offset + 6], 36);
+ writeLongBE6(output, input[offset + 7], 42);
+ break;
+ case 7:
+ writeLongBE7(output, input[offset + 0], 0);
+ writeLongBE7(output, input[offset + 1], 7);
+ writeLongBE7(output, input[offset + 2], 14);
+ writeLongBE7(output, input[offset + 3], 21);
+ writeLongBE7(output, input[offset + 4], 28);
+ writeLongBE7(output, input[offset + 5], 35);
+ writeLongBE7(output, input[offset + 6], 42);
+ writeLongBE7(output, input[offset + 7], 49);
+ break;
+ case 8:
+ writeLongBE8(output, input[offset + 0], 0);
+ writeLongBE8(output, input[offset + 1], 8);
+ writeLongBE8(output, input[offset + 2], 16);
+ writeLongBE8(output, input[offset + 3], 24);
+ writeLongBE8(output, input[offset + 4], 32);
+ writeLongBE8(output, input[offset + 5], 40);
+ writeLongBE8(output, input[offset + 6], 48);
+ writeLongBE8(output, input[offset + 7], 56);
+ break;
+ default:
+ break;
+ }
+
+ final int toWrite = numHops * numBytes;
+ output.write(writeBuffer, 0, toWrite);
+ }
+
+ private void writeLongBE2(OutputStream output, long val, int wbOffset) {
+ writeBuffer[wbOffset + 0] = (byte) (val >>> 8);
+ writeBuffer[wbOffset + 1] = (byte) (val >>> 0);
+ }
+
+ private void writeLongBE3(OutputStream output, long val, int wbOffset) {
+ writeBuffer[wbOffset + 0] = (byte) (val >>> 16);
+ writeBuffer[wbOffset + 1] = (byte) (val >>> 8);
+ writeBuffer[wbOffset + 2] = (byte) (val >>> 0);
+ }
+
+ private void writeLongBE4(OutputStream output, long val, int wbOffset) {
+ writeBuffer[wbOffset + 0] = (byte) (val >>> 24);
+ writeBuffer[wbOffset + 1] = (byte) (val >>> 16);
+ writeBuffer[wbOffset + 2] = (byte) (val >>> 8);
+ writeBuffer[wbOffset + 3] = (byte) (val >>> 0);
+ }
+
+ private void writeLongBE5(OutputStream output, long val, int wbOffset) {
+ writeBuffer[wbOffset + 0] = (byte) (val >>> 32);
+ writeBuffer[wbOffset + 1] = (byte) (val >>> 24);
+ writeBuffer[wbOffset + 2] = (byte) (val >>> 16);
+ writeBuffer[wbOffset + 3] = (byte) (val >>> 8);
+ writeBuffer[wbOffset + 4] = (byte) (val >>> 0);
+ }
+
+ private void writeLongBE6(OutputStream output, long val, int wbOffset) {
+ writeBuffer[wbOffset + 0] = (byte) (val >>> 40);
+ writeBuffer[wbOffset + 1] = (byte) (val >>> 32);
+ writeBuffer[wbOffset + 2] = (byte) (val >>> 24);
+ writeBuffer[wbOffset + 3] = (byte) (val >>> 16);
+ writeBuffer[wbOffset + 4] = (byte) (val >>> 8);
+ writeBuffer[wbOffset + 5] = (byte) (val >>> 0);
+ }
+
+ private void writeLongBE7(OutputStream output, long val, int wbOffset) {
+ writeBuffer[wbOffset + 0] = (byte) (val >>> 48);
+ writeBuffer[wbOffset + 1] = (byte) (val >>> 40);
+ writeBuffer[wbOffset + 2] = (byte) (val >>> 32);
+ writeBuffer[wbOffset + 3] = (byte) (val >>> 24);
+ writeBuffer[wbOffset + 4] = (byte) (val >>> 16);
+ writeBuffer[wbOffset + 5] = (byte) (val >>> 8);
+ writeBuffer[wbOffset + 6] = (byte) (val >>> 0);
+ }
+
+ private void writeLongBE8(OutputStream output, long val, int wbOffset) {
+ writeBuffer[wbOffset + 0] = (byte) (val >>> 56);
+ writeBuffer[wbOffset + 1] = (byte) (val >>> 48);
+ writeBuffer[wbOffset + 2] = (byte) (val >>> 40);
+ writeBuffer[wbOffset + 3] = (byte) (val >>> 32);
+ writeBuffer[wbOffset + 4] = (byte) (val >>> 24);
+ writeBuffer[wbOffset + 5] = (byte) (val >>> 16);
+ writeBuffer[wbOffset + 6] = (byte) (val >>> 8);
+ writeBuffer[wbOffset + 7] = (byte) (val >>> 0);
+ }
+
/**
* Read bitpacked integers from input stream
* @param buffer - input buffer
@@ -435,11 +863,49 @@ static void writeInts(long[] input, int offset, int len, int bitSize,
* @param input - input stream
* @throws IOException
*/
- static void readInts(long[] buffer, int offset, int len, int bitSize,
+ void readInts(long[] buffer, int offset, int len, int bitSize,
InStream input) throws IOException {
int bitsLeft = 0;
int current = 0;
+ switch (bitSize) {
+ case 1:
+ unrolledUnPack1(buffer, offset, len, input);
+ return;
+ case 2:
+ unrolledUnPack2(buffer, offset, len, input);
+ return;
+ case 4:
+ unrolledUnPack4(buffer, offset, len, input);
+ return;
+ case 8:
+ unrolledUnPack8(buffer, offset, len, input);
+ return;
+ case 16:
+ unrolledUnPack16(buffer, offset, len, input);
+ return;
+ case 24:
+ unrolledUnPack24(buffer, offset, len, input);
+ return;
+ case 32:
+ unrolledUnPack32(buffer, offset, len, input);
+ return;
+ case 40:
+ unrolledUnPack40(buffer, offset, len, input);
+ return;
+ case 48:
+ unrolledUnPack48(buffer, offset, len, input);
+ return;
+ case 56:
+ unrolledUnPack56(buffer, offset, len, input);
+ return;
+ case 64:
+ unrolledUnPack64(buffer, offset, len, input);
+ return;
+ default:
+ break;
+ }
+
for(int i = offset; i < (offset + len); i++) {
long result = 0;
int bitsLeftToRead = bitSize;
@@ -460,4 +926,362 @@ static void readInts(long[] buffer, int offset, int len, int bitSize,
buffer[i] = result;
}
}
+
+
+ private void unrolledUnPack1(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ final int numHops = 8;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int val = 0;
+ for (int i = offset; i < endUnroll; i = i + numHops) {
+ val = input.read();
+ buffer[i] = (val >>> 7) & 1;
+ buffer[i + 1] = (val >>> 6) & 1;
+ buffer[i + 2] = (val >>> 5) & 1;
+ buffer[i + 3] = (val >>> 4) & 1;
+ buffer[i + 4] = (val >>> 3) & 1;
+ buffer[i + 5] = (val >>> 2) & 1;
+ buffer[i + 6] = (val >>> 1) & 1;
+ buffer[i + 7] = val & 1;
+ }
+
+ if (remainder > 0) {
+ int startShift = 7;
+ val = input.read();
+ for (int i = endUnroll; i < endOffset; i++) {
+ buffer[i] = (val >>> startShift) & 1;
+ startShift -= 1;
+ }
+ }
+ }
+
+ private void unrolledUnPack2(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ final int numHops = 4;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int val = 0;
+ for (int i = offset; i < endUnroll; i = i + numHops) {
+ val = input.read();
+ buffer[i] = (val >>> 6) & 3;
+ buffer[i + 1] = (val >>> 4) & 3;
+ buffer[i + 2] = (val >>> 2) & 3;
+ buffer[i + 3] = val & 3;
+ }
+
+ if (remainder > 0) {
+ int startShift = 6;
+ val = input.read();
+ for (int i = endUnroll; i < endOffset; i++) {
+ buffer[i] = (val >>> startShift) & 3;
+ startShift -= 2;
+ }
+ }
+ }
+
+ private void unrolledUnPack4(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ final int numHops = 2;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int val = 0;
+ for (int i = offset; i < endUnroll; i = i + numHops) {
+ val = input.read();
+ buffer[i] = (val >>> 4) & 15;
+ buffer[i + 1] = val & 15;
+ }
+
+ if (remainder > 0) {
+ int startShift = 4;
+ val = input.read();
+ for (int i = endUnroll; i < endOffset; i++) {
+ buffer[i] = (val >>> startShift) & 15;
+ startShift -= 4;
+ }
+ }
+ }
+
+ private void unrolledUnPack8(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 1);
+ }
+
+ private void unrolledUnPack16(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 2);
+ }
+
+ private void unrolledUnPack24(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 3);
+ }
+
+ private void unrolledUnPack32(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 4);
+ }
+
+ private void unrolledUnPack40(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 5);
+ }
+
+ private void unrolledUnPack48(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 6);
+ }
+
+ private void unrolledUnPack56(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 7);
+ }
+
+ private void unrolledUnPack64(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 8);
+ }
+
+ private void unrolledUnPackBytes(long[] buffer, int offset, int len, InStream input, int numBytes)
+ throws IOException {
+ final int numHops = 8;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int i = offset;
+ for (; i < endUnroll; i = i + numHops) {
+ readLongBE(input, buffer, i, numHops, numBytes);
+ }
+
+ if (remainder > 0) {
+ readRemainingLongs(buffer, i, input, remainder, numBytes);
+ }
+ }
+
+ private void readRemainingLongs(long[] buffer, int offset, InStream input, int remainder,
+ int numBytes) throws IOException {
+ final int toRead = remainder * numBytes;
+ // bulk read to buffer
+ int bytesRead = input.read(readBuffer, 0, toRead);
+ while (bytesRead != toRead) {
+ bytesRead += input.read(readBuffer, bytesRead, toRead - bytesRead);
+ }
+
+ int idx = 0;
+ switch (numBytes) {
+ case 1:
+ while (remainder > 0) {
+ buffer[offset++] = readBuffer[idx] & 255;
+ remainder--;
+ idx++;
+ }
+ break;
+ case 2:
+ while (remainder > 0) {
+ buffer[offset++] = readLongBE2(input, idx * 2);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 3:
+ while (remainder > 0) {
+ buffer[offset++] = readLongBE3(input, idx * 3);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 4:
+ while (remainder > 0) {
+ buffer[offset++] = readLongBE4(input, idx * 4);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 5:
+ while (remainder > 0) {
+ buffer[offset++] = readLongBE5(input, idx * 5);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 6:
+ while (remainder > 0) {
+ buffer[offset++] = readLongBE6(input, idx * 6);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 7:
+ while (remainder > 0) {
+ buffer[offset++] = readLongBE7(input, idx * 7);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 8:
+ while (remainder > 0) {
+ buffer[offset++] = readLongBE8(input, idx * 8);
+ remainder--;
+ idx++;
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void readLongBE(InStream in, long[] buffer, int start, int numHops, int numBytes)
+ throws IOException {
+ final int toRead = numHops * numBytes;
+ // bulk read to buffer
+ int bytesRead = in.read(readBuffer, 0, toRead);
+ while (bytesRead != toRead) {
+ bytesRead += in.read(readBuffer, bytesRead, toRead - bytesRead);
+ }
+
+ switch (numBytes) {
+ case 1:
+ buffer[start + 0] = readBuffer[0] & 255;
+ buffer[start + 1] = readBuffer[1] & 255;
+ buffer[start + 2] = readBuffer[2] & 255;
+ buffer[start + 3] = readBuffer[3] & 255;
+ buffer[start + 4] = readBuffer[4] & 255;
+ buffer[start + 5] = readBuffer[5] & 255;
+ buffer[start + 6] = readBuffer[6] & 255;
+ buffer[start + 7] = readBuffer[7] & 255;
+ break;
+ case 2:
+ buffer[start + 0] = readLongBE2(in, 0);
+ buffer[start + 1] = readLongBE2(in, 2);
+ buffer[start + 2] = readLongBE2(in, 4);
+ buffer[start + 3] = readLongBE2(in, 6);
+ buffer[start + 4] = readLongBE2(in, 8);
+ buffer[start + 5] = readLongBE2(in, 10);
+ buffer[start + 6] = readLongBE2(in, 12);
+ buffer[start + 7] = readLongBE2(in, 14);
+ break;
+ case 3:
+ buffer[start + 0] = readLongBE3(in, 0);
+ buffer[start + 1] = readLongBE3(in, 3);
+ buffer[start + 2] = readLongBE3(in, 6);
+ buffer[start + 3] = readLongBE3(in, 9);
+ buffer[start + 4] = readLongBE3(in, 12);
+ buffer[start + 5] = readLongBE3(in, 15);
+ buffer[start + 6] = readLongBE3(in, 18);
+ buffer[start + 7] = readLongBE3(in, 21);
+ break;
+ case 4:
+ buffer[start + 0] = readLongBE4(in, 0);
+ buffer[start + 1] = readLongBE4(in, 4);
+ buffer[start + 2] = readLongBE4(in, 8);
+ buffer[start + 3] = readLongBE4(in, 12);
+ buffer[start + 4] = readLongBE4(in, 16);
+ buffer[start + 5] = readLongBE4(in, 20);
+ buffer[start + 6] = readLongBE4(in, 24);
+ buffer[start + 7] = readLongBE4(in, 28);
+ break;
+ case 5:
+ buffer[start + 0] = readLongBE5(in, 0);
+ buffer[start + 1] = readLongBE5(in, 5);
+ buffer[start + 2] = readLongBE5(in, 10);
+ buffer[start + 3] = readLongBE5(in, 15);
+ buffer[start + 4] = readLongBE5(in, 20);
+ buffer[start + 5] = readLongBE5(in, 25);
+ buffer[start + 6] = readLongBE5(in, 30);
+ buffer[start + 7] = readLongBE5(in, 35);
+ break;
+ case 6:
+ buffer[start + 0] = readLongBE6(in, 0);
+ buffer[start + 1] = readLongBE6(in, 6);
+ buffer[start + 2] = readLongBE6(in, 12);
+ buffer[start + 3] = readLongBE6(in, 18);
+ buffer[start + 4] = readLongBE6(in, 24);
+ buffer[start + 5] = readLongBE6(in, 30);
+ buffer[start + 6] = readLongBE6(in, 36);
+ buffer[start + 7] = readLongBE6(in, 42);
+ break;
+ case 7:
+ buffer[start + 0] = readLongBE7(in, 0);
+ buffer[start + 1] = readLongBE7(in, 7);
+ buffer[start + 2] = readLongBE7(in, 14);
+ buffer[start + 3] = readLongBE7(in, 21);
+ buffer[start + 4] = readLongBE7(in, 28);
+ buffer[start + 5] = readLongBE7(in, 35);
+ buffer[start + 6] = readLongBE7(in, 42);
+ buffer[start + 7] = readLongBE7(in, 49);
+ break;
+ case 8:
+ buffer[start + 0] = readLongBE8(in, 0);
+ buffer[start + 1] = readLongBE8(in, 8);
+ buffer[start + 2] = readLongBE8(in, 16);
+ buffer[start + 3] = readLongBE8(in, 24);
+ buffer[start + 4] = readLongBE8(in, 32);
+ buffer[start + 5] = readLongBE8(in, 40);
+ buffer[start + 6] = readLongBE8(in, 48);
+ buffer[start + 7] = readLongBE8(in, 56);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private long readLongBE2(InStream in, int rbOffset) {
+ return (((readBuffer[rbOffset] & 255) << 8)
+ + ((readBuffer[rbOffset + 1] & 255) << 0));
+ }
+
+ private long readLongBE3(InStream in, int rbOffset) {
+ return (((readBuffer[rbOffset] & 255) << 16)
+ + ((readBuffer[rbOffset + 1] & 255) << 8)
+ + ((readBuffer[rbOffset + 2] & 255) << 0));
+ }
+
+ private long readLongBE4(InStream in, int rbOffset) {
+ return (((long) (readBuffer[rbOffset] & 255) << 24)
+ + ((readBuffer[rbOffset + 1] & 255) << 16)
+ + ((readBuffer[rbOffset + 2] & 255) << 8)
+ + ((readBuffer[rbOffset + 3] & 255) << 0));
+ }
+
+ private long readLongBE5(InStream in, int rbOffset) {
+ return (((long) (readBuffer[rbOffset] & 255) << 32)
+ + ((long) (readBuffer[rbOffset + 1] & 255) << 24)
+ + ((readBuffer[rbOffset + 2] & 255) << 16)
+ + ((readBuffer[rbOffset + 3] & 255) << 8)
+ + ((readBuffer[rbOffset + 4] & 255) << 0));
+ }
+
+ private long readLongBE6(InStream in, int rbOffset) {
+ return (((long) (readBuffer[rbOffset] & 255) << 40)
+ + ((long) (readBuffer[rbOffset + 1] & 255) << 32)
+ + ((long) (readBuffer[rbOffset + 2] & 255) << 24)
+ + ((readBuffer[rbOffset + 3] & 255) << 16)
+ + ((readBuffer[rbOffset + 4] & 255) << 8)
+ + ((readBuffer[rbOffset + 5] & 255) << 0));
+ }
+
+ private long readLongBE7(InStream in, int rbOffset) {
+ return (((long) (readBuffer[rbOffset] & 255) << 48)
+ + ((long) (readBuffer[rbOffset + 1] & 255) << 40)
+ + ((long) (readBuffer[rbOffset + 2] & 255) << 32)
+ + ((long) (readBuffer[rbOffset + 3] & 255) << 24)
+ + ((readBuffer[rbOffset + 4] & 255) << 16)
+ + ((readBuffer[rbOffset + 5] & 255) << 8)
+ + ((readBuffer[rbOffset + 6] & 255) << 0));
+ }
+
+ private long readLongBE8(InStream in, int rbOffset) {
+ return (((long) (readBuffer[rbOffset] & 255) << 56)
+ + ((long) (readBuffer[rbOffset + 1] & 255) << 48)
+ + ((long) (readBuffer[rbOffset + 2] & 255) << 40)
+ + ((long) (readBuffer[rbOffset + 3] & 255) << 32)
+ + ((long) (readBuffer[rbOffset + 4] & 255) << 24)
+ + ((readBuffer[rbOffset + 5] & 255) << 16)
+ + ((readBuffer[rbOffset + 6] & 255) << 8)
+ + ((readBuffer[rbOffset + 7] & 255) << 0));
+ }
+
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index b3ab96f..56f6e57 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -28,6 +28,7 @@
import java.util.TreeMap;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -36,6 +37,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.StripeStatistics;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
@@ -134,6 +136,7 @@
private final Configuration conf;
private final OrcFile.WriterCallback callback;
private final OrcFile.WriterContext callbackContext;
+ private final OrcFile.EncodingStrategy encodingStrategy;
WriterImpl(FileSystem fs,
Path path,
@@ -146,7 +149,8 @@
MemoryManager memoryManager,
boolean addBlockPadding,
OrcFile.Version version,
- OrcFile.WriterCallback callback) throws IOException {
+ OrcFile.WriterCallback callback,
+ OrcFile.EncodingStrategy encodingStrategy) throws IOException {
this.fs = fs;
this.path = path;
this.conf = conf;
@@ -164,6 +168,7 @@ public Writer getWriter() {
}
this.stripeSize = stripeSize;
this.version = version;
+ this.encodingStrategy = encodingStrategy;
this.addBlockPadding = addBlockPadding;
// pick large block size to minimize block over or under hangs
this.blockSize = Math.min(MAX_BLOCK_SIZE, 2 * stripeSize);
@@ -404,6 +409,14 @@ public boolean isCompressed() {
}
/**
+ * Get the encoding strategy to use.
+ * @return encoding strategy
+ */
+ public EncodingStrategy getEncodingStrategy() {
+ return encodingStrategy;
+ }
+
+ /**
* Get the writer's configuration.
* @return configuration
*/
@@ -497,9 +510,14 @@ protected ColumnStatisticsImpl getFileStatistics() {
}
IntegerWriter createIntegerWriter(PositionedOutputStream output,
- boolean signed, boolean isDirectV2) {
+ boolean signed, boolean isDirectV2,
+ StreamFactory writer) {
if (isDirectV2) {
- return new RunLengthIntegerWriterV2(output, signed);
+ boolean alignedBitpacking = false;
+ if (writer.getEncodingStrategy().equals(EncodingStrategy.SPEED)) {
+ alignedBitpacking = true;
+ }
+ return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking);
} else {
return new RunLengthIntegerWriter(output, signed);
}
@@ -744,7 +762,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
PositionedOutputStream out = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
this.isDirectV2 = isNewWriteFormat(writer);
- this.writer = createIntegerWriter(out, true, isDirectV2);
+ this.writer = createIntegerWriter(out, true, isDirectV2, writer);
if (inspector instanceof IntObjectInspector) {
intInspector = (IntObjectInspector) inspector;
shortInspector = null;
@@ -806,6 +824,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
private static class FloatTreeWriter extends TreeWriter {
private final PositionedOutputStream stream;
+ private final SerializationUtils utils;
FloatTreeWriter(int columnId,
ObjectInspector inspector,
@@ -814,6 +833,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
super(columnId, inspector, writer, nullable);
this.stream = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
+ this.utils = new SerializationUtils();
recordPosition(rowIndexPosition);
}
@@ -823,7 +843,7 @@ void write(Object obj) throws IOException {
if (obj != null) {
float val = ((FloatObjectInspector) inspector).get(obj);
indexStatistics.updateDouble(val);
- SerializationUtils.writeFloat(stream, val);
+ utils.writeFloat(stream, val);
}
}
@@ -844,6 +864,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
private static class DoubleTreeWriter extends TreeWriter {
private final PositionedOutputStream stream;
+ private final SerializationUtils utils;
DoubleTreeWriter(int columnId,
ObjectInspector inspector,
@@ -852,6 +873,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
super(columnId, inspector, writer, nullable);
this.stream = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
+ this.utils = new SerializationUtils();
recordPosition(rowIndexPosition);
}
@@ -861,7 +883,7 @@ void write(Object obj) throws IOException {
if (obj != null) {
double val = ((DoubleObjectInspector) inspector).get(obj);
indexStatistics.updateDouble(val);
- SerializationUtils.writeDouble(stream, val);
+ utils.writeDouble(stream, val);
}
}
@@ -909,15 +931,15 @@ void recordPosition(PositionRecorder recorder) throws IOException {
stringOutput = writer.createStream(id,
OrcProto.Stream.Kind.DICTIONARY_DATA);
lengthOutput = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
rowOutput = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), false, isDirectV2);
+ OrcProto.Stream.Kind.DATA), false, isDirectV2, writer);
recordPosition(rowIndexPosition);
rowIndexValueCount.add(0L);
buildIndex = writer.buildIndex();
directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
directLengthOutput = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
dictionaryKeySizeThreshold = writer.getConfiguration().getFloat(
HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname,
HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.
@@ -1129,7 +1151,7 @@ String getStringValue(Object obj) {
OrcProto.Stream.Kind.DATA);
this.isDirectV2 = isNewWriteFormat(writer);
this.length = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
recordPosition(rowIndexPosition);
}
@@ -1188,9 +1210,9 @@ void recordPosition(PositionRecorder recorder) throws IOException {
super(columnId, inspector, writer, nullable);
this.isDirectV2 = isNewWriteFormat(writer);
this.seconds = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), true, isDirectV2);
+ OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
this.nanos = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), false, isDirectV2);
+ OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
recordPosition(rowIndexPosition);
}
@@ -1261,7 +1283,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
PositionedOutputStream out = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
this.isDirectV2 = isNewWriteFormat(writer);
- this.writer = createIntegerWriter(out, true, isDirectV2);
+ this.writer = createIntegerWriter(out, true, isDirectV2, writer);
recordPosition(rowIndexPosition);
}
@@ -1314,7 +1336,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
this.isDirectV2 = isNewWriteFormat(writer);
valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
this.scaleStream = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), true, isDirectV2);
+ OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
recordPosition(rowIndexPosition);
}
@@ -1419,7 +1441,7 @@ void writeStripe(OrcProto.StripeFooter.Builder builder,
createTreeWriter(listObjectInspector.getListElementObjectInspector(),
writer, true);
lengths = createIntegerWriter(writer.createStream(columnId,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
recordPosition(rowIndexPosition);
}
@@ -1481,7 +1503,7 @@ void recordPosition(PositionRecorder recorder) throws IOException {
childrenWriters[1] =
createTreeWriter(insp.getMapValueObjectInspector(), writer, true);
lengths = createIntegerWriter(writer.createStream(columnId,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
recordPosition(rowIndexPosition);
}
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
index d389525..41a807b 100644
--- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
@@ -20,24 +20,54 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.List;
import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
+import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
public class TestBitPack {
private static final int SIZE = 100;
private static Random rand = new Random(100);
+ Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test"
+ + File.separator + "tmp"));
+
+ Configuration conf;
+ FileSystem fs;
+ Path testFilePath;
+
+ @Rule
+ public TestName testCaseName = new TestName();
+
+ @Before
+ public void openFileSystem() throws Exception {
+ conf = new Configuration();
+ fs = FileSystem.getLocal(conf);
+ testFilePath = new Path(workDir, "TestOrcFile." + testCaseName.getMethodName() + ".orc");
+ fs.delete(testFilePath, false);
+ }
private long[] deltaEncode(long[] inp) {
long[] output = new long[inp.length];
- for(int i = 0; i < inp.length; i++) {
- output[i] = SerializationUtils.zigzagEncode(inp[i]);
+ SerializationUtils utils = new SerializationUtils();
+ for (int i = 0; i < inp.length; i++) {
+ output[i] = utils.zigzagEncode(inp[i]);
}
return output;
}
@@ -53,7 +83,7 @@ private long nextLong(Random rng, long n) {
private void runTest(int numBits) throws IOException {
long[] inp = new long[SIZE];
- for(int i = 0; i < SIZE; i++) {
+ for (int i = 0; i < SIZE; i++) {
long val = 0;
if (numBits <= 32) {
if (numBits == 1) {
@@ -73,24 +103,20 @@ private void runTest(int numBits) throws IOException {
long minInput = Collections.min(Longs.asList(deltaEncoded));
long maxInput = Collections.max(Longs.asList(deltaEncoded));
long rangeInput = maxInput - minInput;
- int fixedWidth = SerializationUtils.findClosestNumBits(rangeInput);
+ SerializationUtils utils = new SerializationUtils();
+ int fixedWidth = utils.findClosestNumBits(rangeInput);
TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
OutStream output = new OutStream("test", SIZE, null, collect);
- SerializationUtils.writeInts(deltaEncoded, 0, deltaEncoded.length,
- fixedWidth, output);
+ utils.writeInts(deltaEncoded, 0, deltaEncoded.length, fixedWidth, output);
output.flush();
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
long[] buff = new long[SIZE];
- SerializationUtils.readInts(buff, 0, SIZE, fixedWidth,
- InStream.create("test",
- new ByteBuffer[]{inBuf},
- new long[]{0},
- inBuf.remaining(),
- null, SIZE));
- for(int i = 0; i < SIZE; i++) {
- buff[i] = SerializationUtils.zigzagDecode(buff[i]);
+ utils.readInts(buff, 0, SIZE, fixedWidth, InStream.create("test", new ByteBuffer[] { inBuf },
+ new long[] { 0 }, inBuf.remaining(), null, SIZE));
+ for (int i = 0; i < SIZE; i++) {
+ buff[i] = utils.zigzagDecode(buff[i]);
}
assertEquals(numBits, fixedWidth);
assertArrayEquals(inp, buff);
@@ -255,4 +281,36 @@ public void test56BitPacking56Bit() throws IOException {
public void test64BitPacking64Bit() throws IOException {
runTest(64);
}
+
+ @Test
+ public void testBitPack64Large() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class,
+ ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ int size = 1080832;
+ long[] inp = new long[size];
+ Random rand = new Random(1234);
+ for (int i = 0; i < size; i++) {
+ inp[i] = rand.nextLong();
+ }
+ List input = Lists.newArrayList(Longs.asList(inp));
+
+ Writer writer = OrcFile.createWriter(testFilePath,
+ OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.ZLIB));
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
+ RecordReader rows = reader.rows();
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
}
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
index 6869320..6d6f132 100644
--- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
@@ -21,12 +21,15 @@
import java.io.File;
import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -36,12 +39,29 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
+@RunWith(value = Parameterized.class)
public class TestNewIntegerEncoding {
+ private EncodingStrategy encodingStrategy;
+
+ public TestNewIntegerEncoding(EncodingStrategy es) {
+ this.encodingStrategy = es;
+ }
+
+ @Parameters
+ public static Collection