diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 8b03faa..bd72f7b 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -669,29 +669,27 @@ public static boolean distCp(FileSystem srcFS, List srcPaths, Path dst, */ public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf, boolean purge) throws IOException { - LOG.debug("deleting " + f); - boolean result = false; - try { - if(purge) { - LOG.debug("purge is set to true. Not moving to Trash " + f); - } else { - result = Trash.moveToAppropriateTrash(fs, f, conf); - if (result) { - LOG.trace("Moved to trash: " + f); - return true; + LOG.debug("Deleting file [skipTrash:{}] {}", purge, f); + if (!purge) { + try { + final boolean result = Trash.moveToAppropriateTrash(fs, f, conf); + LOG.debug("Moved to trash [success:{}]: {}", result, f); + return result; + } catch (IOException ioe) { + Throwable cause = ioe.getCause(); + if (cause instanceof InterruptedException) { + // Do not continue to try to delete, just exit + LOG.debug("Move to trash interrupted", cause); + throw ioe; } + + // Failure reasons include: trash has lower encryption zone, + // AccessControlException on file. Retry with force delete. + LOG.warn(ioe.getMessage() + "; Force to delete it."); } - } catch (IOException ioe) { - // for whatever failure reason including that trash has lower encryption zone - // retry with force delete - LOG.warn(ioe.getMessage() + "; Force to delete it."); } - result = fs.delete(f, true); - if (!result) { - LOG.error("Failed to delete " + f); - } - return result; + return fs.delete(f, true); } public static boolean rename(FileSystem fs, Path sourcePath, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 4350dc8..5fe2087 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -77,6 +77,7 @@ import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rex.RexBuilder; import org.apache.commons.io.FilenameUtils; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileChecksum; @@ -4866,39 +4867,50 @@ public static boolean trashFiles(final FileSystem fs, final FileStatus[] statuse throws IOException { boolean result = true; - if (statuses == null || statuses.length == 0) { + if (ArrayUtils.isEmpty(statuses)) { return false; } - final List> futures = new LinkedList<>(); - final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? - Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()) : null; + + final int numThreads = + conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25); + + final List> futures = new ArrayList<>(numThreads); + + final ExecutorService pool = (numThreads > 0) + ? Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Delete-Thread-%d").build()) + : MoreExecutors.newDirectExecutorService(); + final SessionState parentSession = SessionState.get(); for (final FileStatus status : statuses) { - if (null == pool) { - result &= FileUtils.moveToTrash(fs, status.getPath(), conf, purge); - } else { - futures.add(pool.submit(new Callable() { - @Override - public Boolean call() throws Exception { - SessionState.setCurrentSessionState(parentSession); - return FileUtils.moveToTrash(fs, status.getPath(), conf, purge); - } - })); - } - } - if (null != pool) { - pool.shutdown(); - for (Future future : futures) { - try { - result &= future.get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Failed to delete: ",e); - pool.shutdownNow(); - throw new IOException(e); + futures.add(pool.submit(new Callable() { + @Override + public Boolean call() throws Exception { + SessionState.setCurrentSessionState(parentSession); + return FileUtils.moveToTrash(fs, status.getPath(), conf, purge); } + })); + } + + // No new tasks accepted; currently submitted will complete + pool.shutdown(); + + for (final Future future : futures) { + try { + result &= future.get(); + } catch (InterruptedException ie) { + LOG.error("Interrupted. Stopping trash action.", ie); + pool.shutdownNow(); + Thread.currentThread().interrupt(); + throw new IOException(ie); + } catch (ExecutionException e) { + LOG.error("Failed to delete file. Stopping trash action.", e); + pool.shutdownNow(); + throw new IOException(e); } } + return result; }