diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 89c9349..cc025ca 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -524,6 +524,7 @@ HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false), HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000), HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10), + HIVE_ORC_SKIP_CORRUPT_DATA("hive.exec.orc.skip.corrupt.data", false), HIVESKEWJOIN("hive.optimize.skewjoin", false), HIVECONVERTJOIN("hive.auto.convert.join", true), diff --git conf/hive-default.xml.template conf/hive-default.xml.template index 420d959..1433b17 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -1893,6 +1893,14 @@ + hive.exec.orc.skip.corrupt.data + false + If ORC reader encounters corrupt data, this value will be used to determine + whether to skip the corrupt data or throw exception. The default behavior is to throw exception. + + + + hive.multi.insert.move.tasks.share.dependencies false diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index f8be581..859211a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -25,6 +25,7 @@ import java.util.regex.Pattern; import org.antlr.runtime.tree.Tree; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.ASTNodeOrigin; @@ -431,7 +432,9 @@ STATSAGGREGATOR_MISSED_SOMESTATS(30016, "Stats type {0} is missing from stats aggregator. If you don't want the query " + "to fail because of this, set hive.stats.atomic=false", true), - STATS_SKIPPING_BY_ERROR(30017, "Skipping stats aggregation by error {0}", true); + STATS_SKIPPING_BY_ERROR(30017, "Skipping stats aggregation by error {0}", true), + ORC_CORRUPTED_READ(30018, "Corruption in ORC data encountered. To skip corrupted data" + + " while reading set " + HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA + " to true"); ; private int errorCode; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java index 5305e00..2ec7856 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java @@ -20,6 +20,7 @@ import java.io.EOFException; import java.io.IOException; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType; @@ -163,14 +164,23 @@ private void readPatchedBaseValues(int firstByte) throws IOException { // unpack the patch blob long[] unpackedPatch = new long[pl]; - SerializationUtils.readInts(unpackedPatch, 0, pl, pw + pgw, input); + + // TODO: add a condition to check if hive.exec.orc.skip.corrupt.data flag is set + // HIVE-6347 will add hive config object to reader interface which can be used to read the + // value set for hive.exec.orc.skip.corrupt.data + if ((pw + pgw) > 64) { + throw new IOException(ErrorMsg.ORC_CORRUPTED_READ.getMsg()); + } + int bitSize = SerializationUtils.getClosestFixedBits(pw + pgw); + SerializationUtils.readInts(unpackedPatch, 0, pl, bitSize, input); // apply the patch directly when decoding the packed data int patchIdx = 0; long currGap = 0; long currPatch = 0; + long patchMask = ((1L << pw) - 1); currGap = unpackedPatch[patchIdx] >>> pw; - currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + currPatch = unpackedPatch[patchIdx] & patchMask; long actualGap = 0; // special case: gap is >255 then patch value will be 0. @@ -179,7 +189,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException { actualGap += 255; patchIdx++; currGap = unpackedPatch[patchIdx] >>> pw; - currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + currPatch = unpackedPatch[patchIdx] & patchMask; } // add the left over gap actualGap += currGap; @@ -199,7 +209,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException { if (patchIdx < pl) { // read the next gap and patch currGap = unpackedPatch[patchIdx] >>> pw; - currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + currPatch = unpackedPatch[patchIdx] & patchMask; actualGap = 0; // special case: gap is >255 then patch will be 0. if gap is @@ -208,7 +218,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException { actualGap += 255; patchIdx++; currGap = unpackedPatch[patchIdx] >>> pw; - currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1); + currPatch = unpackedPatch[patchIdx] & patchMask; } // add the left over gap actualGap += currGap; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java index 3b684d7..9994cee 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java @@ -574,6 +574,15 @@ private void preparePatchedBlob() { patchWidth = brBits100p - brBits95p; patchWidth = SerializationUtils.getClosestFixedBits(patchWidth); + // if patch bit requirement is 64 then it will not possible to pack + // gap and patch together in a long. To make sure gap and patch can be + // packed together adjust the patch width + if (patchWidth == 64) { + patchWidth = 56; + brBits95p = 8; + mask = (1L << brBits95p) - 1; + } + int gapIdx = 0; int patchIdx = 0; int prev = 0; @@ -642,7 +651,7 @@ private void preparePatchedBlob() { long g = gapList[gapIdx++]; long p = patchList[patchIdx++]; while (g > 255) { - gapVsPatchList[i++] = (255 << patchWidth) | 0; + gapVsPatchList[i++] = (255L << patchWidth); g -= 255; } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java index dbb7641..2a7115d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java @@ -887,6 +887,165 @@ public void testPatchedBase511() throws Exception { } @Test + public void testPatchedBaseMax1() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(60)); + } + input.set(511, Long.MAX_VALUE); + + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000)); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseMax2() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 5120; i++) { + input.add((long) rand.nextInt(60)); + } + input.set(128, Long.MAX_VALUE); + input.set(256, Long.MAX_VALUE); + input.set(511, Long.MAX_VALUE); + + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000)); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseMax3() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + input.add(371946367L); + input.add(11963367L); + input.add(68639400007L); + input.add(100233367L); + input.add(6367L); + input.add(10026367L); + input.add(3670000L); + input.add(3602367L); + input.add(4719226367L); + input.add(7196367L); + input.add(444442L); + input.add(210267L); + input.add(21033L); + input.add(160267L); + input.add(400267L); + input.add(23634347L); + input.add(16027L); + input.add(46026367L); + input.add(Long.MAX_VALUE); + input.add(33333L); + + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000)); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test + public void testPatchedBaseMax4() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + List input = Lists.newArrayList(); + for (int i = 0; i < 25; i++) { + input.add(371292224226367L); + input.add(119622332222267L); + input.add(686329400222007L); + input.add(100233333222367L); + input.add(636272333322222L); + input.add(10202633223267L); + input.add(36700222022230L); + input.add(36023226224227L); + input.add(47192226364427L); + input.add(71963622222447L); + input.add(22244444222222L); + input.add(21220263327442L); + input.add(21032233332232L); + input.add(16026322232227L); + input.add(40022262272212L); + input.add(23634342227222L); + input.add(16022222222227L); + input.add(46026362222227L); + input.add(46026362222227L); + input.add(33322222222323L); + } + input.add(Long.MAX_VALUE); + + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000)); + for (Long l : input) { + writer.addRow(l); + } + writer.close(); + + Reader reader = OrcFile.createReader(fs, testFilePath); + RecordReader rows = reader.rows(null); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get()); + } + } + + @Test public void testPatchedBaseTimestamp() throws Exception { ObjectInspector inspector; synchronized (TestOrcFile.class) {