diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a182cd7..237b669 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -533,6 +533,7 @@ HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false), HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000), HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10), + HIVE_ORC_SKIP_CORRUPT_DATA("hive.exec.orc.skip.corrupt.data", false), HIVESKEWJOIN("hive.optimize.skewjoin", false), HIVECONVERTJOIN("hive.auto.convert.join", true), diff --git conf/hive-default.xml.template conf/hive-default.xml.template index 0d08aa2..f7f50e3 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -1933,6 +1933,14 @@ + hive.exec.orc.skip.corrupt.data + false + If ORC reader encounters corrupt data, this value will be used to determine + whether to skip the corrupt data or throw exception. The default behavior is to throw exception. + + + + hive.multi.insert.move.tasks.share.dependencies false diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index f8be581..208e9f4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -25,6 +25,7 @@ import java.util.regex.Pattern; import org.antlr.runtime.tree.Tree; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.ASTNodeOrigin; @@ -431,7 +432,9 @@ STATSAGGREGATOR_MISSED_SOMESTATS(30016, "Stats type {0} is missing from stats aggregator. If you don't want the query " + "to fail because of this, set hive.stats.atomic=false", true), - STATS_SKIPPING_BY_ERROR(30017, "Skipping stats aggregation by error {0}", true); + STATS_SKIPPING_BY_ERROR(30017, "Skipping stats aggregation by error {0}", true), + ORC_CORRUPTED_READ(30018, "Corruption in ORC data encountered. To skip reading corrupted " + + "data, set " + HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA + " to true"); ; private int errorCode; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index 00d38f2..a56fe2f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -114,14 +114,15 @@ private OrcFile() {} * @return a new ORC file reader. * @throws IOException */ - public static Reader createReader(FileSystem fs, Path path - ) throws IOException { - return new ReaderImpl(fs, path); + public static Reader createReader(FileSystem fs, Path path, + Configuration conf) throws IOException { + return new ReaderImpl(fs, path, conf); } - public static Reader createReader(FileSystem fs, Path path, FileMetaInfo fileMetaInfo) + public static Reader createReader(FileSystem fs, Path path, + FileMetaInfo fileMetaInfo, Configuration conf) throws IOException { - return new ReaderImpl(fs, path, fileMetaInfo); + return new ReaderImpl(fs, path, fileMetaInfo, conf); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 56f25b7..a34a6ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -60,6 +61,7 @@ private final OrcProto.Footer footer; private final ObjectInspector inspector; private long deserializedSize = -1; + private final Configuration conf; //serialized footer - Keeping this around for use by getFileMetaInfo() // will help avoid cpu cycles spend in deserializing at cost of increased @@ -288,11 +290,13 @@ static void checkOrcVersion(Log log, Path path, List version) { * Constructor that extracts metadata information from file footer * @param fs * @param path + * @param conf * @throws IOException */ - ReaderImpl(FileSystem fs, Path path) throws IOException { + ReaderImpl(FileSystem fs, Path path, Configuration conf) throws IOException { this.fileSystem = fs; this.path = path; + this.conf = conf; FileMetaInfo footerMetaData = extractMetaInfoFromFooter(fs, path); @@ -316,12 +320,14 @@ static void checkOrcVersion(Log log, Path path, List version) { * @param fs * @param path * @param fMetaInfo + * @param conf * @throws IOException */ - ReaderImpl(FileSystem fs, Path path, FileMetaInfo fMetaInfo) + ReaderImpl(FileSystem fs, Path path, FileMetaInfo fMetaInfo, Configuration conf) throws IOException { this.fileSystem = fs; this.path = path; + this.conf = conf; MetaInfoObjExtractor rInfo = new MetaInfoObjExtractor( fMetaInfo.compressionType, @@ -487,7 +493,7 @@ public RecordReader rows(long offset, long length, boolean[] include, return new RecordReaderImpl(this.getStripes(), fileSystem, path, offset, length, footer.getTypesList(), codec, bufferSize, - include, footer.getRowIndexStride(), sarg, columnNames); + include, footer.getRowIndexStride(), sarg, columnNames, conf); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index c3c9685..7798a7c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -84,6 +85,7 @@ private final int[] filterColumns; // an array about which row groups aren't skipped private boolean[] includedRowGroups = null; + private final Configuration conf; RecordReaderImpl(Iterable stripes, FileSystem fileSystem, @@ -95,13 +97,15 @@ boolean[] included, long strideRate, SearchArgument sarg, - String[] columnNames + String[] columnNames, + Configuration conf ) throws IOException { this.file = fileSystem.open(path); this.codec = codec; this.types = types; this.bufferSize = bufferSize; this.included = included; + this.conf = conf; this.sarg = sarg; if (sarg != null) { sargLeaves = sarg.getLeaves(); @@ -128,7 +132,7 @@ firstRow = skippedRows; totalRowCount = rows; - reader = createTreeReader(path, 0, types, included); + reader = createTreeReader(path, 0, types, included, conf); indexes = new OrcProto.RowIndex[types.size()]; rowIndexStride = strideRate; advanceToNextRow(0L); @@ -163,10 +167,12 @@ public long getNext() { protected final int columnId; private BitFieldReader present = null; protected boolean valuePresent = false; + protected final Configuration conf; - TreeReader(Path path, int columnId) { + TreeReader(Path path, int columnId, Configuration conf) { this.path = path; this.columnId = columnId; + this.conf = conf; } void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { @@ -182,7 +188,7 @@ IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind, switch (kind) { case DIRECT_V2: case DICTIONARY_V2: - return new RunLengthIntegerReaderV2(in, signed); + return new RunLengthIntegerReaderV2(in, signed, conf); case DIRECT: case DICTIONARY: return new RunLengthIntegerReader(in, signed); @@ -275,8 +281,8 @@ Object nextVector(Object previousVector, long batchSize) throws IOException { private static class BooleanTreeReader extends TreeReader{ private BitFieldReader reader = null; - BooleanTreeReader(Path path, int columnId) { - super(path, columnId); + BooleanTreeReader(Path path, int columnId, Configuration conf) { + super(path, columnId, conf); } @Override @@ -335,8 +341,8 @@ Object nextVector(Object previousVector, long batchSize) throws IOException { private static class ByteTreeReader extends TreeReader{ private RunLengthByteReader reader = null; - ByteTreeReader(Path path, int columnId) { - super(path, columnId); + ByteTreeReader(Path path, int columnId, Configuration conf) { + super(path, columnId, conf); } @Override @@ -395,8 +401,8 @@ void skipRows(long items) throws IOException { private static class ShortTreeReader extends TreeReader{ private IntegerReader reader = null; - ShortTreeReader(Path path, int columnId) { - super(path, columnId); + ShortTreeReader(Path path, int columnId, Configuration conf) { + super(path, columnId, conf); } @Override @@ -465,8 +471,8 @@ void skipRows(long items) throws IOException { private static class IntTreeReader extends TreeReader{ private IntegerReader reader = null; - IntTreeReader(Path path, int columnId) { - super(path, columnId); + IntTreeReader(Path path, int columnId, Configuration conf) { + super(path, columnId, conf); } @Override @@ -535,8 +541,8 @@ void skipRows(long items) throws IOException { private static class LongTreeReader extends TreeReader{ private IntegerReader reader = null; - LongTreeReader(Path path, int columnId) { - super(path, columnId); + LongTreeReader(Path path, int columnId, Configuration conf) { + super(path, columnId, conf); } @Override @@ -605,8 +611,8 @@ void skipRows(long items) throws IOException { private static class FloatTreeReader extends TreeReader{ private InStream stream; - FloatTreeReader(Path path, int columnId) { - super(path, columnId); + FloatTreeReader(Path path, int columnId, Configuration conf) { + super(path, columnId, conf); } @Override @@ -685,8 +691,8 @@ void skipRows(long items) throws IOException { private static class DoubleTreeReader extends TreeReader{ private InStream stream; - DoubleTreeReader(Path path, int columnId) { - super(path, columnId); + DoubleTreeReader(Path path, int columnId, Configuration conf) { + super(path, columnId, conf); } @Override @@ -764,8 +770,8 @@ void skipRows(long items) throws IOException { private InStream stream; private IntegerReader lengths = null; - BinaryTreeReader(Path path, int columnId) { - super(path, columnId); + BinaryTreeReader(Path path, int columnId, Configuration conf) { + super(path, columnId, conf); } @Override @@ -843,8 +849,8 @@ void skipRows(long items) throws IOException { private IntegerReader nanos = null; private final LongColumnVector nanoVector = new LongColumnVector(); - TimestampTreeReader(Path path, int columnId) { - super(path, columnId); + TimestampTreeReader(Path path, int columnId, Configuration conf) { + super(path, columnId, conf); } @Override @@ -970,8 +976,8 @@ void skipRows(long items) throws IOException { private static class DateTreeReader extends TreeReader{ private IntegerReader reader = null; - DateTreeReader(Path path, int columnId) { - super(path, columnId); + DateTreeReader(Path path, int columnId, Configuration conf) { + super(path, columnId, conf); } @Override @@ -1045,8 +1051,8 @@ void skipRows(long items) throws IOException { private final int precision; private final int scale; - DecimalTreeReader(Path path, int columnId, int precision, int scale) { - super(path, columnId); + DecimalTreeReader(Path path, int columnId, int precision, int scale, Configuration conf) { + super(path, columnId, conf); this.precision = precision; this.scale = scale; } @@ -1155,8 +1161,8 @@ void skipRows(long items) throws IOException { private static class StringTreeReader extends TreeReader { private TreeReader reader; - StringTreeReader(Path path, int columnId) { - super(path, columnId); + StringTreeReader(Path path, int columnId, Configuration conf) { + super(path, columnId, conf); } @Override @@ -1173,11 +1179,11 @@ void startStripe(Map streams, switch (encodings.get(columnId).getKind()) { case DIRECT: case DIRECT_V2: - reader = new StringDirectTreeReader(path, columnId); + reader = new StringDirectTreeReader(path, columnId, conf); break; case DICTIONARY: case DICTIONARY_V2: - reader = new StringDictionaryTreeReader(path, columnId); + reader = new StringDictionaryTreeReader(path, columnId, conf); break; default: throw new IllegalArgumentException("Unsupported encoding " + @@ -1217,8 +1223,8 @@ void skipRows(long items) throws IOException { private final LongColumnVector scratchlcv; - StringDirectTreeReader(Path path, int columnId) { - super(path, columnId); + StringDirectTreeReader(Path path, int columnId, Configuration conf) { + super(path, columnId, conf); scratchlcv = new LongColumnVector(); } @@ -1366,8 +1372,8 @@ void skipRows(long items) throws IOException { private byte[] dictionaryBufferInBytesCache = null; private final LongColumnVector scratchlcv; - StringDictionaryTreeReader(Path path, int columnId) { - super(path, columnId); + StringDictionaryTreeReader(Path path, int columnId, Configuration conf) { + super(path, columnId, conf); scratchlcv = new LongColumnVector(); } @@ -1532,8 +1538,8 @@ void skipRows(long items) throws IOException { private static class CharTreeReader extends StringTreeReader { int maxLength; - CharTreeReader(Path path, int columnId, int maxLength) { - super(path, columnId); + CharTreeReader(Path path, int columnId, int maxLength, Configuration conf) { + super(path, columnId, conf); this.maxLength = maxLength; } @@ -1560,8 +1566,8 @@ Object next(Object previous) throws IOException { private static class VarcharTreeReader extends StringTreeReader { int maxLength; - VarcharTreeReader(Path path, int columnId, int maxLength) { - super(path, columnId); + VarcharTreeReader(Path path, int columnId, int maxLength, Configuration conf) { + super(path, columnId, conf); this.maxLength = maxLength; } @@ -1591,8 +1597,8 @@ Object next(Object previous) throws IOException { StructTreeReader(Path path, int columnId, List types, - boolean[] included) throws IOException { - super(path, columnId); + boolean[] included, Configuration conf) throws IOException { + super(path, columnId, conf); OrcProto.Type type = types.get(columnId); int fieldCount = type.getFieldNamesCount(); this.fields = new TreeReader[fieldCount]; @@ -1600,7 +1606,7 @@ Object next(Object previous) throws IOException { for(int i=0; i < fieldCount; ++i) { int subtype = type.getSubtypes(i); if (included == null || included[subtype]) { - this.fields[i] = createTreeReader(path, subtype, types, included); + this.fields[i] = createTreeReader(path, subtype, types, included, conf); } this.fieldNames[i] = type.getFieldNames(i); } @@ -1693,15 +1699,15 @@ void skipRows(long items) throws IOException { UnionTreeReader(Path path, int columnId, List types, - boolean[] included) throws IOException { - super(path, columnId); + boolean[] included, Configuration conf) throws IOException { + super(path, columnId, conf); OrcProto.Type type = types.get(columnId); int fieldCount = type.getSubtypesCount(); this.fields = new TreeReader[fieldCount]; for(int i=0; i < fieldCount; ++i) { int subtype = type.getSubtypes(i); if (included == null || included[subtype]) { - this.fields[i] = createTreeReader(path, subtype, types, included); + this.fields[i] = createTreeReader(path, subtype, types, included, conf); } } } @@ -1772,11 +1778,11 @@ void skipRows(long items) throws IOException { ListTreeReader(Path path, int columnId, List types, - boolean[] included) throws IOException { - super(path, columnId); + boolean[] included, Configuration conf) throws IOException { + super(path, columnId, conf); OrcProto.Type type = types.get(columnId); elementReader = createTreeReader(path, type.getSubtypes(0), types, - included); + included, conf); } @Override @@ -1863,18 +1869,18 @@ void skipRows(long items) throws IOException { MapTreeReader(Path path, int columnId, List types, - boolean[] included) throws IOException { - super(path, columnId); + boolean[] included, Configuration conf) throws IOException { + super(path, columnId, conf); OrcProto.Type type = types.get(columnId); int keyColumn = type.getSubtypes(0); int valueColumn = type.getSubtypes(1); if (included == null || included[keyColumn]) { - keyReader = createTreeReader(path, keyColumn, types, included); + keyReader = createTreeReader(path, keyColumn, types, included, conf); } else { keyReader = null; } if (included == null || included[valueColumn]) { - valueReader = createTreeReader(path, valueColumn, types, included); + valueReader = createTreeReader(path, valueColumn, types, included, conf); } else { valueReader = null; } @@ -1956,54 +1962,55 @@ void skipRows(long items) throws IOException { private static TreeReader createTreeReader(Path path, int columnId, List types, - boolean[] included + boolean[] included, + Configuration conf ) throws IOException { OrcProto.Type type = types.get(columnId); switch (type.getKind()) { case BOOLEAN: - return new BooleanTreeReader(path, columnId); + return new BooleanTreeReader(path, columnId, conf); case BYTE: - return new ByteTreeReader(path, columnId); + return new ByteTreeReader(path, columnId, conf); case DOUBLE: - return new DoubleTreeReader(path, columnId); + return new DoubleTreeReader(path, columnId, conf); case FLOAT: - return new FloatTreeReader(path, columnId); + return new FloatTreeReader(path, columnId, conf); case SHORT: - return new ShortTreeReader(path, columnId); + return new ShortTreeReader(path, columnId, conf); case INT: - return new IntTreeReader(path, columnId); + return new IntTreeReader(path, columnId, conf); case LONG: - return new LongTreeReader(path, columnId); + return new LongTreeReader(path, columnId, conf); case STRING: - return new StringTreeReader(path, columnId); + return new StringTreeReader(path, columnId, conf); case CHAR: if (!type.hasMaximumLength()) { throw new IllegalArgumentException("ORC char type has no length specified"); } - return new CharTreeReader(path, columnId, type.getMaximumLength()); + return new CharTreeReader(path, columnId, type.getMaximumLength(), conf); case VARCHAR: if (!type.hasMaximumLength()) { throw new IllegalArgumentException("ORC varchar type has no length specified"); } - return new VarcharTreeReader(path, columnId, type.getMaximumLength()); + return new VarcharTreeReader(path, columnId, type.getMaximumLength(), conf); case BINARY: - return new BinaryTreeReader(path, columnId); + return new BinaryTreeReader(path, columnId, conf); case TIMESTAMP: - return new TimestampTreeReader(path, columnId); + return new TimestampTreeReader(path, columnId, conf); case DATE: - return new DateTreeReader(path, columnId); + return new DateTreeReader(path, columnId, conf); case DECIMAL: int precision = type.hasPrecision() ? type.getPrecision() : HiveDecimal.SYSTEM_DEFAULT_PRECISION; int scale = type.hasScale()? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE; - return new DecimalTreeReader(path, columnId, precision, scale); + return new DecimalTreeReader(path, columnId, precision, scale, conf); case STRUCT: - return new StructTreeReader(path, columnId, types, included); + return new StructTreeReader(path, columnId, types, included, conf); case LIST: - return new ListTreeReader(path, columnId, types, included); + return new ListTreeReader(path, columnId, types, included, conf); case MAP: - return new MapTreeReader(path, columnId, types, included); + return new MapTreeReader(path, columnId, types, included, conf); case UNION: - return new UnionTreeReader(path, columnId, types, included); + return new UnionTreeReader(path, columnId, types, included, conf); default: throw new IllegalArgumentException("Unsupported type " + type.getKind()); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java index 5305e00..a7e66a8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java @@ -20,6 +20,10 @@ import java.io.EOFException; import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType; @@ -34,10 +38,13 @@ private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE]; private int numLiterals = 0; private int used = 0; + private final boolean skipCorrupt; - RunLengthIntegerReaderV2(InStream input, boolean signed) throws IOException { + RunLengthIntegerReaderV2(InStream input, boolean signed, + Configuration conf) throws IOException { this.input = input; this.signed = signed; + this.skipCorrupt = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA); } private void readValues() throws IOException { @@ -163,14 +170,20 @@ private void readPatchedBaseValues(int firstByte) throws IOException { // unpack the patch blob long[] unpackedPatch = new long[pl]; - SerializationUtils.readInts(unpackedPatch, 0, pl, pw + pgw, input); + + if ((pw + pgw) > 64 && !skipCorrupt) { + throw new IOException(ErrorMsg.ORC_CORRUPTED_READ.getMsg()); + } + int bitSize = SerializationUtils.getClosestFixedBits(pw + pgw); + SerializationUtils.readInts(unpackedPatch, 0, pl, bitSize, input); // apply the patch directly when decoding the packed data int patchIdx = 0; long currGap = 0; long currPatch = 0; + long patchMask = ((1L << pw) - 1); currGap = unpackedPatch[patchIdx] >>> pw; - currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + currPatch = unpackedPatch[patchIdx] & patchMask; long actualGap = 0; // special case: gap is >255 then patch value will be 0. @@ -179,7 +192,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException { actualGap += 255; patchIdx++; currGap = unpackedPatch[patchIdx] >>> pw; - currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + currPatch = unpackedPatch[patchIdx] & patchMask; } // add the left over gap actualGap += currGap; @@ -199,7 +212,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException { if (patchIdx < pl) { // read the next gap and patch currGap = unpackedPatch[patchIdx] >>> pw; - currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + currPatch = unpackedPatch[patchIdx] & patchMask; actualGap = 0; // special case: gap is >255 then patch will be 0. if gap is @@ -208,7 +221,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException { actualGap += 255; patchIdx++; currGap = unpackedPatch[patchIdx] >>> pw; - currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + currPatch = unpackedPatch[patchIdx] & patchMask; } // add the left over gap actualGap += currGap; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java index 3b684d7..ccc3728 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java @@ -567,6 +567,10 @@ private void preparePatchedBlob() { // since we are considering only 95 percentile, the size of gap and // patch array can contain only be 5% values patchLength = (int) Math.ceil((baseRedLiterals.length * 0.05)); + + // Adding as a safeguard to avoid off-by-one errors and avoid throwing + // ArrayIndexOfOutOfBoundsException + patchLength += 1; int[] gapList = new int[patchLength]; long[] patchList = new long[patchLength]; @@ -574,6 +578,15 @@ private void preparePatchedBlob() { patchWidth = brBits100p - brBits95p; patchWidth = SerializationUtils.getClosestFixedBits(patchWidth); + // if patch bit requirement is 64 then it will not possible to pack + // gap and patch together in a long. To make sure gap and patch can be + // packed together adjust the patch width + if (patchWidth == 64) { + patchWidth = 56; + brBits95p = 8; + mask = (1L << brBits95p) - 1; + } + int gapIdx = 0; int patchIdx = 0; int prev = 0; @@ -642,7 +655,7 @@ private void preparePatchedBlob() { long g = gapList[gapIdx++]; long p = patchList[patchIdx++]; while (g > 255) { - gapVsPatchList[i++] = (255 << patchWidth) | 0; + gapVsPatchList[i++] = (255L << patchWidth); g -= 255; } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java index dbb7641..2a7115d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java @@ -887,6 +887,165 @@ public void testPatchedBase511() throws Exception { } @Test + public void testPatchedBaseMax1() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(60)); + } + input.set(511, Long.MAX_VALUE); + + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000)); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseMax2() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(60)); + } + input.set(128, Long.MAX_VALUE); + input.set(256, Long.MAX_VALUE); + input.set(511, Long.MAX_VALUE); + + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000)); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseMax3() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add(371946367L); + input.add(11963367L); + input.add(68639400007L); + input.add(100233367L); + input.add(6367L); + input.add(10026367L); + input.add(3670000L); + input.add(3602367L); + input.add(4719226367L); + input.add(7196367L); + input.add(444442L); + input.add(210267L); + input.add(21033L); + input.add(160267L); + input.add(400267L); + input.add(23634347L); + input.add(16027L); + input.add(46026367L); + input.add(Long.MAX_VALUE); + input.add(33333L); + + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000)); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseMax4() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + for (int i = 0; i < 25; i++) { + input.add(371292224226367L); + input.add(119622332222267L); + input.add(686329400222007L); + input.add(100233333222367L); + input.add(636272333322222L); + input.add(10202633223267L); + input.add(36700222022230L); + input.add(36023226224227L); + input.add(47192226364427L); + input.add(71963622222447L); + input.add(22244444222222L); + input.add(21220263327442L); + input.add(21032233332232L); + input.add(16026322232227L); + input.add(40022262272212L); + input.add(23634342227222L); + input.add(16022222222227L); + input.add(46026362222227L); + input.add(46026362222227L); + input.add(33322222222323L); + } + input.add(Long.MAX_VALUE); + + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000)); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test public void testPatchedBaseTimestamp() throws Exception { ObjectInspector inspector; synchronized (TestOrcFile.class) {