diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ad3d59c..21d0c62 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -526,6 +526,7 @@
HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false),
HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000),
HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10),
+ HIVE_ORC_SKIP_CORRUPT_DATA("hive.exec.orc.skip.corrupt.data", false),
HIVESKEWJOIN("hive.optimize.skewjoin", false),
HIVECONVERTJOIN("hive.auto.convert.join", true),
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index 420d959..1433b17 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -1893,6 +1893,14 @@
+ hive.exec.orc.skip.corrupt.data
+ false
+ If ORC reader encounters corrupt data, this value will be used to determine
+ whether to skip the corrupt data or throw exception. The default behavior is to throw exception.
+
+
+
+
hive.multi.insert.move.tasks.share.dependencies
false
diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index f8be581..859211a 100644
--- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -25,6 +25,7 @@
import java.util.regex.Pattern;
import org.antlr.runtime.tree.Tree;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.ASTNodeOrigin;
@@ -431,7 +432,9 @@
STATSAGGREGATOR_MISSED_SOMESTATS(30016,
"Stats type {0} is missing from stats aggregator. If you don't want the query " +
"to fail because of this, set hive.stats.atomic=false", true),
- STATS_SKIPPING_BY_ERROR(30017, "Skipping stats aggregation by error {0}", true);
+ STATS_SKIPPING_BY_ERROR(30017, "Skipping stats aggregation by error {0}", true),
+ ORC_CORRUPTED_READ(30018, "Corruption in ORC data encountered. To skip corrupted data"
+ + " while reading set " + HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA + " to true");
;
private int errorCode;
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
index 5305e00..2ec7856 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
@@ -20,6 +20,7 @@
import java.io.EOFException;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
@@ -163,14 +164,23 @@ private void readPatchedBaseValues(int firstByte) throws IOException {
// unpack the patch blob
long[] unpackedPatch = new long[pl];
- SerializationUtils.readInts(unpackedPatch, 0, pl, pw + pgw, input);
+
+ // TODO: add a condition to check if hive.exec.orc.skip.corrupt.data flag is set
+ // HIVE-6347 will add hive config object to reader interface which can be used to read the
+ // value set for hive.exec.orc.skip.corrupt.data
+ if ((pw + pgw) > 64) {
+ throw new IOException(ErrorMsg.ORC_CORRUPTED_READ.getMsg());
+ }
+ int bitSize = SerializationUtils.getClosestFixedBits(pw + pgw);
+ SerializationUtils.readInts(unpackedPatch, 0, pl, bitSize, input);
// apply the patch directly when decoding the packed data
int patchIdx = 0;
long currGap = 0;
long currPatch = 0;
+ long patchMask = ((1L << pw) - 1);
currGap = unpackedPatch[patchIdx] >>> pw;
- currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ currPatch = unpackedPatch[patchIdx] & patchMask;
long actualGap = 0;
// special case: gap is >255 then patch value will be 0.
@@ -179,7 +189,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException {
actualGap += 255;
patchIdx++;
currGap = unpackedPatch[patchIdx] >>> pw;
- currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ currPatch = unpackedPatch[patchIdx] & patchMask;
}
// add the left over gap
actualGap += currGap;
@@ -199,7 +209,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException {
if (patchIdx < pl) {
// read the next gap and patch
currGap = unpackedPatch[patchIdx] >>> pw;
- currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ currPatch = unpackedPatch[patchIdx] & patchMask;
actualGap = 0;
// special case: gap is >255 then patch will be 0. if gap is
@@ -208,7 +218,7 @@ private void readPatchedBaseValues(int firstByte) throws IOException {
actualGap += 255;
patchIdx++;
currGap = unpackedPatch[patchIdx] >>> pw;
- currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ currPatch = unpackedPatch[patchIdx] & patchMask;
}
// add the left over gap
actualGap += currGap;
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
index 3b684d7..ccc3728 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
@@ -567,6 +567,10 @@ private void preparePatchedBlob() {
// since we are considering only 95 percentile, the size of gap and
// patch array can contain only be 5% values
patchLength = (int) Math.ceil((baseRedLiterals.length * 0.05));
+
+ // Adding as a safeguard to avoid off-by-one errors and avoid throwing
+ // ArrayIndexOfOutOfBoundsException
+ patchLength += 1;
int[] gapList = new int[patchLength];
long[] patchList = new long[patchLength];
@@ -574,6 +578,15 @@ private void preparePatchedBlob() {
patchWidth = brBits100p - brBits95p;
patchWidth = SerializationUtils.getClosestFixedBits(patchWidth);
+ // if patch bit requirement is 64 then it will not possible to pack
+ // gap and patch together in a long. To make sure gap and patch can be
+ // packed together adjust the patch width
+ if (patchWidth == 64) {
+ patchWidth = 56;
+ brBits95p = 8;
+ mask = (1L << brBits95p) - 1;
+ }
+
int gapIdx = 0;
int patchIdx = 0;
int prev = 0;
@@ -642,7 +655,7 @@ private void preparePatchedBlob() {
long g = gapList[gapIdx++];
long p = patchList[patchIdx++];
while (g > 255) {
- gapVsPatchList[i++] = (255 << patchWidth) | 0;
+ gapVsPatchList[i++] = (255L << patchWidth);
g -= 255;
}
diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
index dbb7641..2a7115d 100644
--- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
+++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
@@ -887,6 +887,165 @@ public void testPatchedBase511() throws Exception {
}
@Test
+ public void testPatchedBaseMax1() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class,
+ ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(60));
+ }
+ input.set(511, Long.MAX_VALUE);
+
+ Writer writer = OrcFile.createWriter(testFilePath,
+ OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000));
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseMax2() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class,
+ ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 5120; i++) {
+ input.add((long) rand.nextInt(60));
+ }
+ input.set(128, Long.MAX_VALUE);
+ input.set(256, Long.MAX_VALUE);
+ input.set(511, Long.MAX_VALUE);
+
+ Writer writer = OrcFile.createWriter(testFilePath,
+ OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000));
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseMax3() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class,
+ ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ input.add(371946367L);
+ input.add(11963367L);
+ input.add(68639400007L);
+ input.add(100233367L);
+ input.add(6367L);
+ input.add(10026367L);
+ input.add(3670000L);
+ input.add(3602367L);
+ input.add(4719226367L);
+ input.add(7196367L);
+ input.add(444442L);
+ input.add(210267L);
+ input.add(21033L);
+ input.add(160267L);
+ input.add(400267L);
+ input.add(23634347L);
+ input.add(16027L);
+ input.add(46026367L);
+ input.add(Long.MAX_VALUE);
+ input.add(33333L);
+
+ Writer writer = OrcFile.createWriter(testFilePath,
+ OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000));
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
+ public void testPatchedBaseMax4() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class,
+ ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ List input = Lists.newArrayList();
+ for (int i = 0; i < 25; i++) {
+ input.add(371292224226367L);
+ input.add(119622332222267L);
+ input.add(686329400222007L);
+ input.add(100233333222367L);
+ input.add(636272333322222L);
+ input.add(10202633223267L);
+ input.add(36700222022230L);
+ input.add(36023226224227L);
+ input.add(47192226364427L);
+ input.add(71963622222447L);
+ input.add(22244444222222L);
+ input.add(21220263327442L);
+ input.add(21032233332232L);
+ input.add(16026322232227L);
+ input.add(40022262272212L);
+ input.add(23634342227222L);
+ input.add(16022222222227L);
+ input.add(46026362222227L);
+ input.add(46026362222227L);
+ input.add(33322222222323L);
+ }
+ input.add(Long.MAX_VALUE);
+
+ Writer writer = OrcFile.createWriter(testFilePath,
+ OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000));
+ for (Long l : input) {
+ writer.addRow(l);
+ }
+ writer.close();
+
+ Reader reader = OrcFile.createReader(fs, testFilePath);
+ RecordReader rows = reader.rows(null);
+ int idx = 0;
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+ }
+ }
+
+ @Test
public void testPatchedBaseTimestamp() throws Exception {
ObjectInspector inspector;
synchronized (TestOrcFile.class) {