diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 1df6094..052b70f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -210,6 +210,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -2486,7 +2487,8 @@ public static ContentSummary getInputSummary(final Context ctx, MapWork work, Pa new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Get-Input-Summary-%d").build()); } else { - executor = null; + LOG.info("Not using thread pool for getContentSummary"); + executor = MoreExecutors.newDirectExecutorService(); } getInputSummaryWithPool(ctx, Collections.unmodifiableSet(pathNeedProcess), work, summary, executor); @@ -2513,6 +2515,7 @@ static void getInputSummaryWithPool(final Context ctx, final ExecutorService executor) throws IOException { Preconditions.checkNotNull(ctx); Preconditions.checkNotNull(pathNeedProcess); + Preconditions.checkNotNull(executor); List> futures = new ArrayList>(pathNeedProcess.size()); final AtomicLong totalLength = new AtomicLong(0L); @@ -2529,9 +2532,7 @@ public void interrupt() { LOG.debug("Failed to close filesystem", ignore); } } - if (executor != null) { - executor.shutdownNow(); - } + executor.shutdownNow(); } }); try { @@ -2624,41 +2625,29 @@ private void recordSummary(final Path p, final ContentSummary cs) { } }; - if (executor == null) { - r.run(); - } else { - Future future = executor.submit(r); - futures.add(future); - } + futures.add(executor.submit(r)); } - if (executor != null) { - for (Future future : futures) { - boolean executorDone = false; - do { - try { - future.get(); - executorDone = true; - } catch (InterruptedException e) { - LOG.info("Interrupted when waiting threads", e); - Thread.currentThread().interrupt(); - break; - } catch (ExecutionException e) { - throw new IOException(e); - } - } while (!executorDone); + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException e) { + LOG.info("Interrupted when waiting threads", e); + Thread.currentThread().interrupt(); + break; + } catch (ExecutionException e) { + throw new IOException(e); } - executor.shutdown(); } + executor.shutdown(); + HiveInterruptUtils.checkInterrupted(); summary[0] += totalLength.get(); summary[1] += totalFileCount.get(); summary[2] += totalDirectoryCount.get(); } finally { - if (executor != null) { - executor.shutdownNow(); - } + executor.shutdownNow(); HiveInterruptUtils.remove(interrup); } }