Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1127229)
+++ conf/hive-default.xml (working copy)
@@ -1118,4 +1118,11 @@
of data sampled.
+
+ hive.io.exception.handlers
+
+ A list of io exception handlers that are used to handle
+ exceptions thrown by record readers
+
+
Index: ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (revision 1101261)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (working copy)
@@ -20,6 +20,7 @@
import java.io.IOException;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
@@ -40,9 +41,11 @@
protected final InputFormat inputFormat;
protected final JobConf jobConf;
protected final Reporter reporter;
- protected RecordReader curReader;
+ protected RecordReader curReader;
protected long progress;
protected int idx;
+
+ protected HiveIOExceptionHandlerChain ioExpectionHandler;
public BucketizedHiveRecordReader(InputFormat inputFormat,
BucketizedHiveInputSplit bucketizedSplit, JobConf jobConf,
@@ -51,6 +54,8 @@
this.inputFormat = inputFormat;
this.jobConf = jobConf;
this.reporter = reporter;
+ this.ioExpectionHandler = HiveIOExceptionHandlerChain
+ .getHiveIOExceptionHandlerChain(jobConf);
initNextRecordReader();
}
@@ -86,13 +91,21 @@
}
public boolean doNext(K key, V value) throws IOException {
- while ((curReader == null) || !curReader.next(key, value)) {
+ while ((curReader == null) || !doNextWithExceptionHandler(key, value)) {
if (!initNextRecordReader()) {
return false;
}
}
return true;
}
+
+ private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
+ try {
+ return curReader.next(key, value);
+ } catch (Exception e) {
+ return handlerNextException(e);
+ }
+ }
/**
* Get the record reader for the next chunk in this
@@ -117,9 +130,25 @@
curReader = inputFormat.getRecordReader(split.getSplit(idx), jobConf,
reporter);
} catch (Exception e) {
- throw new RuntimeException(e);
+ curReader = handleRecordReaderCreationException(e);
}
idx++;
return true;
}
+
+ private RecordReader handleRecordReaderCreationException(Exception e)
+ throws IOException {
+ if (ioExpectionHandler != null) {
+ return ioExpectionHandler.handleRecordReaderCreationException(e);
+ }
+ throw new IOException(e);
+ }
+
+ private boolean handlerNextException(Exception e) throws IOException {
+ if (ioExpectionHandler != null) {
+ return ioExpectionHandler.handleRecordReaderNextException(e);
+ }
+ throw new IOException(e);
+ }
+
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 1101261)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (working copy)
@@ -33,6 +33,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -171,7 +172,9 @@
}
JobConf job;
-
+
+ protected HiveIOExceptionHandlerChain ioExpectionHandler;
+
public void configure(JobConf job) {
this.job = job;
}
@@ -204,6 +207,11 @@
Reporter reporter) throws IOException {
HiveInputSplit hsplit = (HiveInputSplit) split;
+
+ if(ioExpectionHandler == null) {
+ this.ioExpectionHandler = HiveIOExceptionHandlerChain
+ .getHiveIOExceptionHandlerChain(job);
+ }
InputSplit inputSplit = hsplit.getInputSplit();
String inputFormatClassName = null;
@@ -234,13 +242,25 @@
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass,
cloneJobConf);
- RecordReader innerReader = inputFormat.getRecordReader(inputSplit,
+ RecordReader innerReader = null;
+ try {
+ innerReader = inputFormat.getRecordReader(inputSplit,
cloneJobConf, reporter);
-
- HiveRecordReader rr = new HiveRecordReader(innerReader);
+ } catch (Exception e) {
+ innerReader = handleRecordReaderCreationException(e, inputFormat);
+ }
+ HiveRecordReader rr = new HiveRecordReader(innerReader, ioExpectionHandler);
rr.initIOContext(hsplit, job, inputFormatClass, innerReader);
return rr;
}
+
+ private RecordReader handleRecordReaderCreationException(Exception e, InputFormat inputFormat)
+ throws IOException {
+ if (ioExpectionHandler != null) {
+ return ioExpectionHandler.handleRecordReaderCreationException(e);
+ }
+ throw new IOException(e);
+ }
protected Map pathToPartitionInfo;
MapredWork mrwork = null;
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (revision 1101261)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (working copy)
@@ -20,9 +20,11 @@
import java.io.IOException;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
import org.apache.hadoop.hive.ql.exec.ExecMapper;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
/**
@@ -33,9 +35,12 @@
extends HiveContextAwareRecordReader {
private final RecordReader recordReader;
+ protected HiveIOExceptionHandlerChain ioExpectionHandler;
- public HiveRecordReader(RecordReader recordReader) throws IOException {
+ public HiveRecordReader(RecordReader recordReader, HiveIOExceptionHandlerChain ioExpectionHandler)
+ throws IOException {
this.recordReader = recordReader;
+ this.ioExpectionHandler = ioExpectionHandler;
}
public void doClose() throws IOException {
@@ -63,7 +68,11 @@
if (ExecMapper.getDone()) {
return false;
}
- return recordReader.next(key, value);
+ try {
+ return recordReader.next(key, value);
+ } catch (Exception e) {
+ return this.ioExpectionHandler.handleRecordReaderNextException(e);
+ }
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java (revision 1101261)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java (working copy)
@@ -33,6 +33,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.io.LongWritable;
@@ -97,6 +98,8 @@
}
}
+ HiveIOExceptionHandlerChain ioExpectionHandler;
+
@Override
public RecordReader getRecordReader(
InputSplit split, JobConf job, Reporter reporter) throws IOException {
@@ -105,7 +108,28 @@
// The target data is in TextInputFormat.
TextInputFormat inputFormat = new TextInputFormat();
inputFormat.configure(job);
- return inputFormat.getRecordReader(targetSplit, job, reporter);
+ if (ioExpectionHandler == null) {
+ ioExpectionHandler = HiveIOExceptionHandlerChain
+ .getHiveIOExceptionHandlerChain(job);
+ }
+ RecordReader innerReader = null;
+ try {
+ innerReader = inputFormat.getRecordReader(targetSplit, job,
+ reporter);
+ } catch (Exception e) {
+ innerReader = handleRecordReaderCreationException(e, inputFormat);
+ }
+ HiveRecordReader rr = new HiveRecordReader(innerReader, ioExpectionHandler);
+ rr.initIOContext((FileSplit)targetSplit, job, TextInputFormat.class, innerReader);
+ return rr;
+ }
+
+ private RecordReader handleRecordReaderCreationException(Exception e,
+ InputFormat inputFormat) throws IOException {
+ if (ioExpectionHandler != null) {
+ return ioExpectionHandler.handleRecordReaderCreationException(e);
+ }
+ throw new IOException(e);
}
/**
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (revision 1101261)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (working copy)
@@ -396,9 +396,10 @@
}
} else {
int jump = smallTblBucketNum / bigTblBucketNum;
+ List bucketNames = aliasToBucketFileNamesMapping.get(alias);
for (int i = index; i < aliasToBucketFileNamesMapping.get(alias).size(); i = i + jump) {
if(i <= aliasToBucketFileNamesMapping.get(alias).size()) {
- resultFileNames.add(aliasToBucketFileNamesMapping.get(alias).get(i));
+ resultFileNames.add(bucketNames.get(i));
}
}
}
Index: shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
===================================================================
--- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (revision 1101261)
+++ shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (working copy)
@@ -32,6 +32,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
@@ -209,10 +210,14 @@
protected RecordReader curReader;
protected boolean isShrinked;
protected long shrinkedLength;
+
+ protected HiveIOExceptionHandlerChain ioExpectionHandler;
public boolean next(K key, V value) throws IOException {
- while ((curReader == null) || !curReader.next((K)((CombineHiveKey)key).getKey(), value)) {
+ while ((curReader == null)
+ || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(),
+ value)) {
if (!initNextRecordReader(key)) {
return false;
}
@@ -281,8 +286,44 @@
throw new RuntimeException(rrClass.getName() +
" does not have valid constructor", e);
}
+
+ this.ioExpectionHandler = HiveIOExceptionHandlerChain
+ .getHiveIOExceptionHandlerChain(job);
initNextRecordReader(null);
}
+
+ /**
+ * do next and handle exception inside it.
+ * @param key
+ * @param value
+ * @return
+ * @throws IOException
+ */
+ private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
+ try {
+ return curReader.next(key, value);
+ } catch (Exception e) {
+ if (ioExpectionHandler != null) {
+ return ioExpectionHandler.handleRecordReaderNextException(e);
+ }
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * handle exception thrown when creating record reader
+ *
+ * @param e
+ * @return
+ * @throws IOException
+ */
+ private RecordReader handleRecordReaderCreationException(Exception e)
+ throws IOException {
+ if (ioExpectionHandler != null) {
+ return ioExpectionHandler.handleRecordReaderCreationException(e);
+ }
+ throw new IOException(e);
+ }
/**
* Get the record reader for the next chunk in this CombineFileSplit.
@@ -318,7 +359,7 @@
jc.setLong("map.input.start", split.getOffset(idx));
jc.setLong("map.input.length", split.getLength(idx));
} catch (Exception e) {
- throw new RuntimeException(e);
+ curReader=handleRecordReaderCreationException(e);
}
idx++;
return true;
Index: shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
===================================================================
--- shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (revision 1101261)
+++ shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (working copy)
@@ -30,6 +30,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
import org.apache.hadoop.hive.shims.Hadoop20Shims;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.Hadoop20Shims.InputSplitShim;
@@ -213,11 +214,15 @@
protected long progress;
protected RecordReader curReader;
protected boolean isShrinked;
- protected long shrinkedLength;
+ protected long shrinkedLength;
+
+ protected HiveIOExceptionHandlerChain ioExpectionHandler;
public boolean next(K key, V value) throws IOException {
- while ((curReader == null) || !curReader.next((K)((CombineHiveKey)key).getKey(), value)) {
+ while ((curReader == null)
+ || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(),
+ value)) {
if (!initNextRecordReader(key)) {
return false;
}
@@ -286,8 +291,44 @@
throw new RuntimeException(rrClass.getName() +
" does not have valid constructor", e);
}
+
+ this.ioExpectionHandler = HiveIOExceptionHandlerChain
+ .getHiveIOExceptionHandlerChain(job);
initNextRecordReader(null);
}
+
+ /**
+ * do next and handle exception inside it.
+ * @param key
+ * @param value
+ * @return
+ * @throws IOException
+ */
+ private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
+ try {
+ return curReader.next(key, value);
+ } catch (Exception e) {
+ if (ioExpectionHandler != null) {
+ return ioExpectionHandler.handleRecordReaderNextException(e);
+ }
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * handle exception thrown when creating record reader
+ *
+ * @param e
+ * @return
+ * @throws IOException
+ */
+ private RecordReader handleRecordReaderCreationException(Exception e)
+ throws IOException {
+ if (ioExpectionHandler != null) {
+ return ioExpectionHandler.handleRecordReaderCreationException(e);
+ }
+ throw new IOException(e);
+ }
/**
* Get the record reader for the next chunk in this CombineFileSplit.
@@ -323,7 +364,7 @@
jc.setLong("map.input.start", split.getOffset(idx));
jc.setLong("map.input.length", split.getLength(idx));
} catch (Exception e) {
- throw new RuntimeException(e);
+ curReader = handleRecordReaderCreationException(e);
}
idx++;
return true;
Index: shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandler.java
===================================================================
--- shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandler.java (revision 0)
+++ shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandler.java (revision 0)
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * HiveIOExceptionHandler defines an interface that all io exception handler in
+ * Hive should implement. Different IO exception handlers can implement
+ * different logics based on the exception input into it.
+ */
+public interface HiveIOExceptionHandler {
+
+ /**
+ * process exceptions raised when creating a record reader.
+ *
+ * @param e
+ * @return
+ */
+ public RecordReader, ?> handleRecordReaderCreationException(Exception e)
+ throws IOException;
+
+ /**
+ * process exceptions thrown when calling rr's next
+ *
+ * @param e
+ * @param result
+ * @throws IOException
+ */
+ public void handleRecorReaderNextException(Exception e,
+ HiveIOExceptionNextHandleResult result) throws IOException;
+
+}
Index: shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerChain.java
===================================================================
--- shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerChain.java (revision 0)
+++ shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerChain.java (revision 0)
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An exception handler chain that process the input exception by going through
+ * all exceptions defined in this chain one by one until either one exception
+ * handler returns true or it reaches the end of the chain. If it reaches the
+ * end of the chain, and still no exception handler returns true, throw the
+ * exception to the caller.
+ */
+public class HiveIOExceptionHandlerChain {
+
+ public static String HIVE_IO_EXCEPTION_HANDLE_CHAIN = "hive.io.exception.handlers";
+
+ @SuppressWarnings("unchecked")
+ public static HiveIOExceptionHandlerChain getHiveIOExceptionHandlerChain(
+ JobConf conf) {
+ HiveIOExceptionHandlerChain chain = new HiveIOExceptionHandlerChain();
+ String exceptionHandlerStr = conf.get(HIVE_IO_EXCEPTION_HANDLE_CHAIN);
+ List handlerChain = new ArrayList();
+ if (exceptionHandlerStr != null && !exceptionHandlerStr.trim().equals("")) {
+ String[] handlerArr = exceptionHandlerStr.split(",");
+ if (handlerArr != null && handlerArr.length > 0) {
+ for (String handlerStr : handlerArr) {
+ if (!handlerStr.trim().equals("")) {
+ try {
+ Class extends HiveIOExceptionHandler> handlerCls =
+ (Class extends HiveIOExceptionHandler>) Class.forName(handlerStr);
+ HiveIOExceptionHandler handler = ReflectionUtils.newInstance(handlerCls, null);
+ handlerChain.add(handler);
+ } catch (Exception e) {
+ }
+ }
+ }
+ }
+ }
+
+ chain.setHandlerChain(handlerChain);
+ return chain;
+ }
+
+ private List handlerChain;
+
+ /**
+ * @return the exception handler chain defined
+ */
+ protected List getHandlerChain() {
+ return handlerChain;
+ }
+
+ /**
+ * set the exception handler chain
+ * @param handlerChain
+ */
+ protected void setHandlerChain(List handlerChain) {
+ this.handlerChain = handlerChain;
+ }
+
+ public RecordReader,?> handleRecordReaderCreationException(Exception e) throws IOException {
+ RecordReader, ?> ret = null;
+
+ if (handlerChain != null && handlerChain.size() > 0) {
+ for (HiveIOExceptionHandler handler : handlerChain) {
+ ret = handler.handleRecordReaderCreationException(e);
+ if (ret != null) {
+ return ret;
+ }
+ }
+ }
+
+ //re-throw the exception as an IOException
+ throw new IOException(e);
+ }
+
+ public boolean handleRecordReaderNextException(Exception e)
+ throws IOException {
+ HiveIOExceptionNextHandleResult result = new HiveIOExceptionNextHandleResult();
+ if (handlerChain != null && handlerChain.size() > 0) {
+ for (HiveIOExceptionHandler handler : handlerChain) {
+ handler.handleRecorReaderNextException(e, result);
+ if (result.getHandled()) {
+ return result.getHandleResult();
+ }
+ }
+ }
+
+ //re-throw the exception as an IOException
+ throw new IOException(e);
+ }
+
+}
Index: shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionNextHandleResult.java
===================================================================
--- shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionNextHandleResult.java (revision 0)
+++ shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionNextHandleResult.java (revision 0)
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.io;
+
+/**
+ * A container to store handling results for exceptions produced in record
+ * reader's next(). It basically contains 2 fields, one is to store if it is
+ * handled or not, another field to store the result.
+ */
+public class HiveIOExceptionNextHandleResult {
+
+ // this exception has been handled
+ private boolean handled;
+
+ //the handling results
+ private boolean handleResult;
+
+ public boolean getHandled() {
+ return handled;
+ }
+
+ public void setHandled(boolean handled) {
+ this.handled = handled;
+ }
+
+ public boolean getHandleResult() {
+ return handleResult;
+ }
+
+ public void setHandleResult(boolean handleResult) {
+ this.handleResult = handleResult;
+ }
+
+ public void clear() {
+ handled = false;
+ handleResult = false;
+ }
+
+}