diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 82abd52..2e657b7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1693,16 +1693,23 @@ private void constructOneLBLocationMap(FileStatus fSta, * @return partition map details (PartitionSpec and Partition) * @throws HiveException */ - public Map, Partition> loadDynamicPartitions(Path loadPath, - String tableName, Map partSpec, boolean replace, - int numDP, boolean listBucketingEnabled, boolean isAcid, long txnId, boolean hasFollowingStatsTask, - AcidUtils.Operation operation) + public Map, Partition> loadDynamicPartitions(final Path loadPath, + final String tableName, final Map partSpec, final boolean replace, + final int numDP, final boolean listBucketingEnabled, final boolean isAcid, final long txnId, + final boolean hasFollowingStatsTask, final AcidUtils.Operation operation) throws HiveException { Set validPartitions = new HashSet(); + int poolSize = Math.max(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), 1); + final ExecutorService pool = + Executors.newFixedThreadPool(poolSize, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("load-dynamic-partitions-%d") + .build()); try { - Map, Partition> partitionsMap = new - LinkedHashMap, Partition>(); + final Map, Partition> partitionsMap = + Collections.synchronizedMap(new LinkedHashMap, Partition>()); FileSystem fs = loadPath.getFileSystem(conf); FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); @@ -1711,7 +1718,7 @@ private void constructOneLBLocationMap(FileStatus fSta, validPartitions.add(s.getPath()); } - int partsToLoad = validPartitions.size(); + final int partsToLoad = validPartitions.size(); if (partsToLoad == 0) { LOG.warn("No partition is generated by dynamic partitioning"); } @@ -1724,36 +1731,61 @@ private void constructOneLBLocationMap(FileStatus fSta, + " to at least " + partsToLoad + '.'); } - Table tbl = getTable(tableName); + final Table tbl = getTable(tableName); // for each dynamically created DP directory, construct a full partition spec // and load the partition based on that Iterator iter = validPartitions.iterator(); LOG.info("Going to load " + partsToLoad + " partitions."); - PrintStream ps = null; - boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0 + final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0 && InPlaceUpdates.inPlaceEligible(conf); - if(inPlaceEligible) { - ps = SessionState.getConsole().getInfoStream(); - } - int partitionsLoaded = 0; + final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null; + final AtomicInteger partitionsLoaded = new AtomicInteger(0); + final List> futures = new LinkedList<>(); + final SessionState parentSession = SessionState.get(); while (iter.hasNext()) { // get the dynamically created directory - Path partPath = iter.next(); + final Path partPath = iter.next(); assert fs.getFileStatus(partPath).isDir(): "partitions " + partPath + " is not a directory !"; - // generate a full partition specification - LinkedHashMap fullPartSpec = new LinkedHashMap(partSpec); - Warehouse.makeSpecFromName(fullPartSpec, partPath); - Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace, - true, listBucketingEnabled, false, isAcid, hasFollowingStatsTask); - partitionsMap.put(fullPartSpec, newPartition); - if (inPlaceEligible) { - InPlaceUpdates.rePositionCursor(ps); - InPlaceUpdates.reprintLine(ps, "Loaded : " + ++partitionsLoaded + "/" + partsToLoad +" partitions."); - } - LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); + futures.add(pool.submit(new Callable() { + @Override + public Boolean call() throws Exception { + try { + // move file would require seession details (needCopy() invokes SessionState.get) + SessionState.setCurrentSessionState(parentSession); + // generate a full partition specification + LinkedHashMap fullPartSpec = + new LinkedHashMap(partSpec); + Warehouse.makeSpecFromName(fullPartSpec, partPath); + Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace, + true, listBucketingEnabled, false, isAcid, hasFollowingStatsTask); + partitionsMap.put(fullPartSpec, newPartition); + + if (inPlaceEligible) { + synchronized (ps) { + InPlaceUpdates.rePositionCursor(ps); + partitionsLoaded.incrementAndGet(); + InPlaceUpdates.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/" + + partsToLoad + " partitions."); + } + } + LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); + return true; + } catch(Exception t) { + LOG.error("Exception when loading partition " + partPath, t); + throw t; + } + } + })); } + pool.shutdown(); + LOG.debug("Number of partitions to be added is " + futures.size()); + + for(Future future : futures) { + future.get(); + } + if (isAcid) { List partNames = new ArrayList<>(partitionsMap.size()); for (Partition p : partitionsMap.values()) { @@ -1767,6 +1799,10 @@ private void constructOneLBLocationMap(FileStatus fSta, throw new HiveException(e); } catch (TException te) { throw new HiveException(te); + } catch (InterruptedException e) { + throw new HiveException(e); + } catch (ExecutionException e) { + throw new HiveException(e); } }