diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index a93c7e1..6abd1bf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -61,8 +61,11 @@ import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -73,6 +76,7 @@ import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.WordUtils; @@ -1542,40 +1546,72 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I return true; } - public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, - int dpLevels, int numBuckets, Configuration hconf, Long mmWriteId) throws IOException { + public static List removeTempOrDuplicateFiles(final FileSystem fs, FileStatus[] fileStats, + int dpLevels, final int numBuckets, final Configuration hconf, final Long mmWriteId) throws + IOException { if (fileStats == null) { return null; } - List result = new ArrayList(); - HashMap taskIDToFile = null; + final List result = new ArrayList(); if (dpLevels > 0) { - FileStatus parts[] = fileStats; - for (int i = 0; i < parts.length; ++i) { - assert parts[i].isDirectory() : "dynamic partition " + parts[i].getPath() - + " is not a directory"; - Path path = parts[i].getPath(); - Utilities.LOG14535.info("removeTempOrDuplicateFiles looking at DP " + path); - if (removeEmptyDpDirectory(fs, path)) { - parts[i] = null; - continue; - } - FileStatus[] items = fs.listStatus(path); + final FileStatus parts[] = fileStats; + int poolSize = Math.max(hconf.getInt(ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT + .varname, 1), 1); + Utilities.LOG14535.debug("Thread pool size=" + poolSize); + final ExecutorService pool = Executors.newFixedThreadPool(poolSize, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("remove-temp-dup-files-%d") + .build()); + + try { + List> futures = Lists.newLinkedList(); + for (int i = 0; i < parts.length; ++i) { + assert parts[i].isDirectory() : "dynamic partition " + parts[i].getPath() + + " is not a directory"; + final Path path = parts[i].getPath(); + Utilities.LOG14535.info("removeTempOrDuplicateFiles looking at DP " + path); + + final Integer iVal = i; + futures.add(pool.submit(new Callable() { + @Override public Void call() throws Exception { + if (removeEmptyDpDirectory(fs, path)) { + parts[iVal] = null; + return null; + } + FileStatus[] items = fs.listStatus(path); - if (mmWriteId != null) { - Path mmDir = parts[i].getPath(); - if (!mmDir.getName().equals(ValidWriteIds.getMmFilePrefix(mmWriteId))) { - throw new IOException("Unexpected non-MM directory name " + mmDir); + if (mmWriteId != null) { + Path mmDir = parts[iVal].getPath(); + if (!mmDir.getName().equals(ValidWriteIds.getMmFilePrefix(mmWriteId))) { + throw new IOException("Unexpected non-MM directory name " + mmDir); + } + Utilities.LOG14535 + .info("removeTempOrDuplicateFiles processing files in MM directory " + mmDir); + } + HashMap taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs); + + // TODO: not clear why two if conditions are different. Preserve the existing logic for now. + addBucketFileToResults(taskIDToFile, numBuckets, hconf, result); + return null; + } + } + )); + } + pool.shutdown(); + for (Future f : futures) { + try { + f.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); } - Utilities.LOG14535.info("removeTempOrDuplicateFiles processing files in MM directory " + mmDir); } - taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs); - - // TODO: not clear why two if conditions are different. Preserve the existing logic for now. - addBucketFileToResults(taskIDToFile, numBuckets, hconf, result); + } finally { + pool.shutdownNow(); } } else { FileStatus[] items = fileStats; + HashMap taskIDToFile = null; if (items.length == 0) { return result; } @@ -3826,7 +3862,8 @@ private static void tryDelete(FileSystem fs, Path path) { } public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels, - int lbLevels, PathFilter filter, long mmWriteId, Configuration conf) throws IOException { + int lbLevels, PathFilter filter, long mmWriteId, Configuration conf, + String unionSuffix, Set committed) throws IOException { int skipLevels = dpLevels + lbLevels; if (filter == null) { filter = new ValidWriteIds.IdPathFilter(mmWriteId, true); @@ -3835,7 +3872,7 @@ private static void tryDelete(FileSystem fs, Path path) { return statusToPath(fs.listStatus(path, filter)); } if (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs)) { - return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter); + return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter, unionSuffix, committed); } return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, mmWriteId); } @@ -3859,7 +3896,8 @@ private static boolean isS3(FileSystem fs) { } private static Path[] getMmDirectoryCandidatesRecursive(FileSystem fs, - Path path, int skipLevels, PathFilter filter) throws IOException { + Path path, int skipLevels, PathFilter filter, + String unionSuffix, Set committed) throws IOException { String lastRelDir = null; HashSet results = new HashSet(); String relRoot = Path.getPathWithoutSchemeAndAuthority(path).toString(); @@ -3869,6 +3907,15 @@ private static boolean isS3(FileSystem fs) { RemoteIterator allFiles = fs.listFiles(path, true); while (allFiles.hasNext()) { LocatedFileStatus lfs = allFiles.next(); + if (committed != null) { + //clean up committed on need basis. + if (unionSuffix == null) { + if (committed.contains(lfs.getPath().toString())) { + committed.remove(lfs.getPath().toString()); + Utilities.LOG14535.debug(lfs.getPath().toString() + " removed from committed set"); + } + } + } Path dirPath = Path.getPathWithoutSchemeAndAuthority(lfs.getPath()); String dir = dirPath.toString(); if (!dir.startsWith(relRoot)) { @@ -3920,7 +3967,7 @@ private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manif int dpLevels, int lbLevels, String unionSuffix, ValidWriteIds.IdPathFilter filter, long mmWriteId, Configuration conf) throws IOException { Path[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, mmWriteId, conf); + fs, specPath, dpLevels, lbLevels, filter, mmWriteId, conf, unionSuffix, null); if (files != null) { for (Path path : files) { Utilities.LOG14535.info("Deleting " + path + " on failure"); @@ -4009,17 +4056,8 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con Utilities.LOG14535.info("Creating table directory for CTAS with no output at " + specPath); FileUtils.mkdir(fs, specPath, hconf); } - Path[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, mmWriteId, hconf); - ArrayList mmDirectories = new ArrayList<>(); - if (files != null) { - for (Path path : files) { - Utilities.LOG14535.info("Looking at path: " + path); - mmDirectories.add(path); - } - } - HashSet committed = new HashSet<>(); + Set committed = new HashSet<>(); for (Path mfp : manifests) { try (FSDataInputStream mdis = fs.open(mfp)) { int fileCount = mdis.readInt(); @@ -4032,6 +4070,21 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con } } + + Utilities.LOG14535.info("committed size:" + committed.size()); + Path[] files = getMmDirectoryCandidates( + fs, specPath, dpLevels, lbLevels, filter, mmWriteId, hconf, unionSuffix, committed); + Utilities.LOG14535.info("committed size after scanning: " + committed.size()); + ArrayList mmDirectories = new ArrayList<>(); + if (files != null) { + for (Path path : files) { + Utilities.LOG14535.info("Looking at path: " + path); + mmDirectories.add(path); + } + } + Utilities.LOG14535.info("size of mmDirs:" + mmDirectories.size() + ", committed:" + + committed.size()); + if (manifestDir != null) { Utilities.LOG14535.info("Deleting manifest directory " + manifestDir); tryDelete(fs, manifestDir); @@ -4046,8 +4099,10 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con } } - for (Path path : mmDirectories) { - cleanMmDirectory(path, fs, unionSuffix, committed); + if (!committed.isEmpty()) { + for (Path path : mmDirectories) { + cleanMmDirectory(path, fs, unionSuffix, committed); + } } if (!committed.isEmpty()) { @@ -4081,7 +4136,7 @@ public PathOnlyFileStatus(Path path) { } private static void cleanMmDirectory(Path dir, FileSystem fs, - String unionSuffix, HashSet committed) throws IOException, HiveException { + String unionSuffix, Set committed) throws IOException, HiveException { for (FileStatus child : fs.listStatus(dir)) { Path childPath = child.getPath(); if (unionSuffix == null) {