Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1129953) +++ conf/hive-default.xml (working copy) @@ -1118,4 +1118,12 @@ of data sampled. + + hive.io.exception.handlers + + A list of io exception handler class names. This is used + to construct a list exception handlers to handle exceptions thrown + by record readers + + Index: ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (revision 1129953) +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (working copy) @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; @@ -40,10 +41,10 @@ protected final InputFormat inputFormat; protected final JobConf jobConf; protected final Reporter reporter; - protected RecordReader curReader; + protected RecordReader curReader; protected long progress; protected int idx; - + public BucketizedHiveRecordReader(InputFormat inputFormat, BucketizedHiveInputSplit bucketizedSplit, JobConf jobConf, Reporter reporter) throws IOException { @@ -86,13 +87,21 @@ } public boolean doNext(K key, V value) throws IOException { - while ((curReader == null) || !curReader.next(key, value)) { + while ((curReader == null) || !doNextWithExceptionHandler(key, value)) { if (!initNextRecordReader()) { return false; } } return true; } + + private boolean doNextWithExceptionHandler(K key, V value) throws IOException { + try { + return curReader.next(key, value); + } catch (Exception e) { + return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf); + } + } /** * Get the record reader for the next chunk in this @@ -117,9 +126,11 @@ curReader = inputFormat.getRecordReader(split.getSplit(idx), jobConf, reporter); } catch (Exception e) { - throw new RuntimeException(e); + curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jobConf); } idx++; return true; } + + } Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 1129953) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (working copy) @@ -33,6 +33,9 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.io.HiveIOExceptionHandler; +import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain; +import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -171,7 +174,7 @@ } JobConf job; - + public void configure(JobConf job) { this.job = job; } @@ -204,7 +207,7 @@ Reporter reporter) throws IOException { HiveInputSplit hsplit = (HiveInputSplit) split; - + InputSplit inputSplit = hsplit.getInputSplit(); String inputFormatClassName = null; Class inputFormatClass = null; @@ -234,14 +237,19 @@ InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, cloneJobConf); - RecordReader innerReader = inputFormat.getRecordReader(inputSplit, + RecordReader innerReader = null; + try { + innerReader = inputFormat.getRecordReader(inputSplit, cloneJobConf, reporter); - - HiveRecordReader rr = new HiveRecordReader(innerReader); + } catch (Exception e) { + innerReader = HiveIOExceptionHandlerUtil + .handleRecordReaderCreationException(e, cloneJobConf); + } + HiveRecordReader rr = new HiveRecordReader(innerReader, job); rr.initIOContext(hsplit, job, inputFormatClass, innerReader); return rr; } - + protected Map pathToPartitionInfo; MapredWork mrwork = null; Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (revision 1129953) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (working copy) @@ -20,9 +20,11 @@ import java.io.IOException; +import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.ql.exec.ExecMapper; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; /** @@ -33,9 +35,18 @@ extends HiveContextAwareRecordReader { private final RecordReader recordReader; + + private JobConf jobConf; - public HiveRecordReader(RecordReader recordReader) throws IOException { + public HiveRecordReader(RecordReader recordReader) + throws IOException { + this.recordReader = recordReader; + } + + public HiveRecordReader(RecordReader recordReader, JobConf conf) + throws IOException { this.recordReader = recordReader; + this.jobConf = conf; } public void doClose() throws IOException { @@ -63,7 +74,11 @@ if (ExecMapper.getDone()) { return false; } - return recordReader.next(key, value); + try { + return recordReader.next(key, value); + } catch (Exception e) { + return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf); + } } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java (revision 1129953) +++ ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.io.LongWritable; @@ -105,9 +106,19 @@ // The target data is in TextInputFormat. TextInputFormat inputFormat = new TextInputFormat(); inputFormat.configure(job); - return inputFormat.getRecordReader(targetSplit, job, reporter); + RecordReader innerReader = null; + try { + innerReader = inputFormat.getRecordReader(targetSplit, job, + reporter); + } catch (Exception e) { + innerReader = HiveIOExceptionHandlerUtil + .handleRecordReaderCreationException(e, job); + } + HiveRecordReader rr = new HiveRecordReader(innerReader, job); + rr.initIOContext((FileSplit)targetSplit, job, TextInputFormat.class, innerReader); + return rr; } - + /** * Parses all target paths from job input directory which contains symlink * files, and splits the target data using TextInputFormat. Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (revision 1129953) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (working copy) @@ -396,9 +396,10 @@ } } else { int jump = smallTblBucketNum / bigTblBucketNum; + List bucketNames = aliasToBucketFileNamesMapping.get(alias); for (int i = index; i < aliasToBucketFileNamesMapping.get(alias).size(); i = i + jump) { if(i <= aliasToBucketFileNamesMapping.get(alias).size()) { - resultFileNames.add(aliasToBucketFileNamesMapping.get(alias).get(i)); + resultFileNames.add(bucketNames.get(i)); } } } Index: shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java =================================================================== --- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (revision 1129953) +++ shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (working copy) @@ -32,6 +32,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain; +import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputFormat; @@ -209,10 +211,12 @@ protected RecordReader curReader; protected boolean isShrinked; protected long shrinkedLength; - + public boolean next(K key, V value) throws IOException { - while ((curReader == null) || !curReader.next((K)((CombineHiveKey)key).getKey(), value)) { + while ((curReader == null) + || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(), + value)) { if (!initNextRecordReader(key)) { return false; } @@ -283,6 +287,21 @@ } initNextRecordReader(null); } + + /** + * do next and handle exception inside it. + * @param key + * @param value + * @return + * @throws IOException + */ + private boolean doNextWithExceptionHandler(K key, V value) throws IOException { + try { + return curReader.next(key, value); + } catch (Exception e) { + return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jc); + } + } /** * Get the record reader for the next chunk in this CombineFileSplit. @@ -318,7 +337,7 @@ jc.setLong("map.input.start", split.getOffset(idx)); jc.setLong("map.input.length", split.getLength(idx)); } catch (Exception e) { - throw new RuntimeException(e); + curReader=HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jc); } idx++; return true; Index: shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java =================================================================== --- shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (revision 1129953) +++ shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (working copy) @@ -30,6 +30,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain; +import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.shims.Hadoop20Shims; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.Hadoop20Shims.InputSplitShim; @@ -213,11 +215,13 @@ protected long progress; protected RecordReader curReader; protected boolean isShrinked; - protected long shrinkedLength; + protected long shrinkedLength; public boolean next(K key, V value) throws IOException { - while ((curReader == null) || !curReader.next((K)((CombineHiveKey)key).getKey(), value)) { + while ((curReader == null) + || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(), + value)) { if (!initNextRecordReader(key)) { return false; } @@ -288,6 +292,22 @@ } initNextRecordReader(null); } + + /** + * do next and handle exception inside it. + * @param key + * @param value + * @return + * @throws IOException + */ + private boolean doNextWithExceptionHandler(K key, V value) throws IOException { + try { + return curReader.next(key, value); + } catch (Exception e) { + return HiveIOExceptionHandlerUtil + .handleRecordReaderNextException(e, jc); + } + } /** * Get the record reader for the next chunk in this CombineFileSplit. @@ -323,7 +343,8 @@ jc.setLong("map.input.start", split.getOffset(idx)); jc.setLong("map.input.length", split.getLength(idx)); } catch (Exception e) { - throw new RuntimeException(e); + curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException( + e, jc); } idx++; return true; Index: shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandler.java =================================================================== --- shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandler.java (revision 0) +++ shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandler.java (revision 0) @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.io; + +import java.io.IOException; + +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.RecordReader; + +/** + * HiveIOExceptionHandler defines an interface that all io exception handler in + * Hive should implement. Different IO exception handlers can implement + * different logics based on the exception input into it. + */ +public interface HiveIOExceptionHandler { + + /** + * process exceptions raised when creating a record reader. + * + * @param e + * @return + */ + public RecordReader handleRecordReaderCreationException(Exception e) + throws IOException; + + /** + * process exceptions thrown when calling rr's next + * + * @param e + * @param result + * @throws IOException + */ + public void handleRecorReaderNextException(Exception e, + HiveIOExceptionNextHandleResult result) throws IOException; + +} Index: shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerChain.java =================================================================== --- shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerChain.java (revision 0) +++ shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerChain.java (revision 0) @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * An exception handler chain that process the input exception by going through + * all exceptions defined in this chain one by one until either one exception + * handler returns true or it reaches the end of the chain. If it reaches the + * end of the chain, and still no exception handler returns true, throw the + * exception to the caller. + */ +public class HiveIOExceptionHandlerChain { + + public static String HIVE_IO_EXCEPTION_HANDLE_CHAIN = "hive.io.exception.handlers"; + + @SuppressWarnings("unchecked") + public static HiveIOExceptionHandlerChain getHiveIOExceptionHandlerChain( + JobConf conf) { + HiveIOExceptionHandlerChain chain = new HiveIOExceptionHandlerChain(); + String exceptionHandlerStr = conf.get(HIVE_IO_EXCEPTION_HANDLE_CHAIN); + List handlerChain = new ArrayList(); + if (exceptionHandlerStr != null && !exceptionHandlerStr.trim().equals("")) { + String[] handlerArr = exceptionHandlerStr.split(","); + if (handlerArr != null && handlerArr.length > 0) { + for (String handlerStr : handlerArr) { + if (!handlerStr.trim().equals("")) { + try { + Class handlerCls = + (Class) Class.forName(handlerStr); + HiveIOExceptionHandler handler = ReflectionUtils.newInstance(handlerCls, null); + handlerChain.add(handler); + } catch (Exception e) { + } + } + } + } + } + + chain.setHandlerChain(handlerChain); + return chain; + } + + private List handlerChain; + + /** + * @return the exception handler chain defined + */ + protected List getHandlerChain() { + return handlerChain; + } + + /** + * set the exception handler chain + * @param handlerChain + */ + protected void setHandlerChain(List handlerChain) { + this.handlerChain = handlerChain; + } + + public RecordReader handleRecordReaderCreationException(Exception e) throws IOException { + RecordReader ret = null; + + if (handlerChain != null && handlerChain.size() > 0) { + for (HiveIOExceptionHandler handler : handlerChain) { + ret = handler.handleRecordReaderCreationException(e); + if (ret != null) { + return ret; + } + } + } + + //re-throw the exception as an IOException + throw new IOException(e); + } + + /** + * This is to handle exception when doing next operations. Here we use a + * HiveIOExceptionNextHandleResult to store the results of each handler. If + * the exception is handled by one handler, the handler should set + * HiveIOExceptionNextHandleResult to be handled, and also set the handle + * result. The handle result is used to return the reader's next to determine + * if need to open a new file for read or not. + */ + public boolean handleRecordReaderNextException(Exception e) + throws IOException { + HiveIOExceptionNextHandleResult result = new HiveIOExceptionNextHandleResult(); + if (handlerChain != null && handlerChain.size() > 0) { + for (HiveIOExceptionHandler handler : handlerChain) { + handler.handleRecorReaderNextException(e, result); + if (result.getHandled()) { + return result.getHandleResult(); + } + } + } + + //re-throw the exception as an IOException + throw new IOException(e); + } + +} Index: shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerUtil.java =================================================================== --- shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerUtil.java (revision 0) +++ shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerUtil.java (revision 0) @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.io; + +import java.io.IOException; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; + +public class HiveIOExceptionHandlerUtil { + + private static ThreadLocal handlerChainInstance = + new ThreadLocal(); + + private static HiveIOExceptionHandlerChain get(JobConf job) { + HiveIOExceptionHandlerChain cache = HiveIOExceptionHandlerUtil.handlerChainInstance + .get(); + if (cache == null) { + HiveIOExceptionHandlerChain toSet = HiveIOExceptionHandlerChain + .getHiveIOExceptionHandlerChain(job); + handlerChainInstance.set(toSet); + cache = HiveIOExceptionHandlerUtil.handlerChainInstance.get(); + } + return cache; + } + + /** + * Handle exception thrown when creating record reader. In case that there is + * an exception raised when construction the record reader and one handler can + * handle this exception, it should return an record reader, which is either a + * dummy empty record reader or a specific record reader that do some magic. + * + * @param e + * @param job + * @return + * @throws IOException + */ + public static RecordReader handleRecordReaderCreationException(Exception e, + JobConf job) throws IOException { + HiveIOExceptionHandlerChain ioExpectionHandlerChain = get(job); + if (ioExpectionHandlerChain != null) { + return ioExpectionHandlerChain.handleRecordReaderCreationException(e); + } + throw new IOException(e); + } + + /** + * Handle exception thrown when calling record reader's next. If this + * exception is handled by one handler, will just return true. Otherwise, + * either re-throw this exception in one handler or at the end of the handler + * chain. + * + * @param e + * @param job + * @return + * @throws IOException + */ + public static boolean handleRecordReaderNextException(Exception e, JobConf job) + throws IOException { + HiveIOExceptionHandlerChain ioExpectionHandlerChain = get(job); + if (ioExpectionHandlerChain != null) { + return ioExpectionHandlerChain.handleRecordReaderNextException(e); + } + throw new IOException(e); + } + +} Index: shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionNextHandleResult.java =================================================================== --- shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionNextHandleResult.java (revision 0) +++ shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionNextHandleResult.java (revision 0) @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.io; + +/** + * A container to store handling results for exceptions produced in record + * reader's next(). It basically contains 2 fields, one is to store if it is + * handled or not, another field to store the result. + */ +public class HiveIOExceptionNextHandleResult { + + // this exception has been handled + private boolean handled; + + //the handling results + private boolean handleResult; + + public boolean getHandled() { + return handled; + } + + public void setHandled(boolean handled) { + this.handled = handled; + } + + public boolean getHandleResult() { + return handleResult; + } + + public void setHandleResult(boolean handleResult) { + this.handleResult = handleResult; + } + + public void clear() { + handled = false; + handleResult = false; + } + +}