diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 4ec5138fb7..5516c8955f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2968,6 +2968,12 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, if (replace && !destIsSubDir) { destFs.delete(destf, true); LOG.debug("The path " + destf.toString() + " is deleted"); + } else if (replace && destIsSubDir) { + // if destFs contains old files but its partition is missing from hms, it may + // bring reduplicated files when replacing + if (!trashFilesUnderDir(destFs, destf, conf)) { + throw new HiveException("Delete files under dest " + destf + " failed"); + } } } catch (FileNotFoundException ignore) { //if dest dir does not exist, any re @@ -3360,37 +3366,39 @@ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, * @return true if deletion successful * @throws IOException */ - private boolean trashFilesUnderDir(final FileSystem fs, Path f, final Configuration conf) + private static boolean trashFilesUnderDir(final FileSystem fs, Path f, final Configuration conf) throws IOException { FileStatus[] statuses = fs.listStatus(f, FileUtils.HIDDEN_FILES_PATH_FILTER); boolean result = true; - final List> futures = new LinkedList<>(); - final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? - Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()) : null; - final SessionState parentSession = SessionState.get(); - for (final FileStatus status : statuses) { - if (null == pool) { - result &= FileUtils.moveToTrash(fs, status.getPath(), conf); - } else { - futures.add(pool.submit(new Callable() { - @Override - public Boolean call() throws Exception { - SessionState.setCurrentSessionState(parentSession); - return FileUtils.moveToTrash(fs, status.getPath(), conf); - } - })); + if (statuses.length > 0) { + final List> futures = new LinkedList<>(); + final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()) : null; + final SessionState parentSession = SessionState.get(); + for (final FileStatus status : statuses) { + if (null == pool) { + result &= FileUtils.moveToTrash(fs, status.getPath(), conf); + } else { + futures.add(pool.submit(new Callable() { + @Override + public Boolean call() throws Exception { + SessionState.setCurrentSessionState(parentSession); + return FileUtils.moveToTrash(fs, status.getPath(), conf); + } + })); + } } - } - if (null != pool) { - pool.shutdown(); - for (Future future : futures) { - try { - result &= future.get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Failed to delete: ",e); - pool.shutdownNow(); - throw new IOException(e); + if (null != pool) { + pool.shutdown(); + for (Future future : futures) { + try { + result &= future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Failed to delete: ", e); + pool.shutdownNow(); + throw new IOException(e); + } } } }