diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index b3f1f7c..d960252 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -333,6 +333,7 @@ private boolean isVectorMode(Configuration conf) { private final boolean cacheStripeDetails; private final AtomicInteger cacheHitCounter = new AtomicInteger(0); private final AtomicInteger numFilesCounter = new AtomicInteger(0); + private Throwable fatalError = null; /** * A count of the number of threads that may create more work for the @@ -388,10 +389,14 @@ OrcSplit getResult(int index) { * @param runnable the object to run */ synchronized void schedule(Runnable runnable) { - if (runnable instanceof FileGenerator) { - schedulers += 1; + if (fatalError == null) { + if (runnable instanceof FileGenerator || runnable instanceof SplitGenerator) { + schedulers += 1; + } + threadPool.execute(runnable); + } else { + throw new RuntimeException("serious problem", fatalError); } - threadPool.execute(runnable); } /** @@ -404,6 +409,11 @@ synchronized void decrementSchedulers() { } } + synchronized void notifyOnNonIOException(Throwable th) { + fatalError = th; + notify(); + } + /** * Wait until all of the tasks are done. It waits until all of the * threads that may create more work are done and then shuts down the @@ -413,6 +423,10 @@ synchronized void waitForTasks() { try { while (schedulers != 0) { wait(); + if (fatalError != null) { + threadPool.shutdownNow(); + throw new RuntimeException("serious problem", fatalError); + } } threadPool.shutdown(); threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); @@ -457,13 +471,19 @@ public void run() { } } // mark the fact that we are done - context.decrementSchedulers(); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_BLOCK_LOCATIONS); } catch (Throwable th) { - context.decrementSchedulers(); + if (!(th instanceof IOException)) { + LOG.error("Unexpected Exception", th); + } synchronized (context.errors) { context.errors.add(th); } + if (!(th instanceof IOException)) { + context.notifyOnNonIOException(th); + } + } finally { + context.decrementSchedulers(); } } @@ -643,9 +663,17 @@ public void run() { createSplit(currentOffset, currentLength, fileMetaInfo); } } catch (Throwable th) { + if (!(th instanceof IOException)) { + LOG.error("Unexpected Exception", th); + } synchronized (context.errors) { context.errors.add(th); } + if (!(th instanceof IOException)) { + context.notifyOnNonIOException(th); + } + } finally { + context.decrementSchedulers(); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CREATE_ORC_SPLITS); } @@ -678,9 +706,15 @@ private void populateAndCacheStripeDetails() { } } } catch (Throwable th) { + if (!(th instanceof IOException)) { + LOG.error("Unexpected Exception", th); + } synchronized (context.errors) { context.errors.add(th); } + if (!(th instanceof IOException)) { + context.notifyOnNonIOException(th); + } } } @@ -705,7 +739,7 @@ private void populateAndCacheStripeDetails() { if (th instanceof IOException) { errors.add((IOException) th); } else { - throw new IOException("serious problem", th); + throw new RuntimeException("serious problem", th); } } throw new InvalidInputException(errors);