diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index ef0bb3d..de59af6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1499,6 +1499,16 @@ public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { + + Partition oldPart = getPartition(tbl, partSpec, false); + return loadPartition(loadPath, tbl, partSpec, oldPart, replace, inheritTableSpecs, + isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask); + } + + public Partition loadPartition(Path loadPath, Table tbl, + Map partSpec, Partition oldPart, boolean replace, + boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, + boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); try { /** @@ -1508,7 +1518,6 @@ public Partition loadPartition(Path loadPath, Table tbl, * processes might move forward with partial data */ - Partition oldPart = getPartition(tbl, partSpec, false); Path oldPartPath = null; if(oldPart != null) { oldPartPath = oldPart.getDataLocation(); @@ -1582,14 +1591,19 @@ public Partition loadPartition(Path loadPath, Table tbl, StatsSetupConst.TRUE); } MetaStoreUtils.populateQuickStats(HiveStatsUtils.getFileStatusRecurse(newPartPath, -1, newPartPath.getFileSystem(conf)), newTPart.getParameters()); - getMSC().add_partition(newTPart.getTPartition()); + synchronized (metaStoreClient) { + getMSC().add_partition(newTPart.getTPartition()); + } } else { EnvironmentContext environmentContext = null; if (hasFollowingStatsTask) { environmentContext = new EnvironmentContext(); environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); } - alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newTPart.getTPartition()), environmentContext); + synchronized (metaStoreClient) { + alterPartition(tbl.getDbName(), tbl.getTableName(), + new Partition(tbl, newTPart.getTPartition()), environmentContext); + } } return newTPart; } catch (IOException e) { @@ -1705,16 +1719,23 @@ private void constructOneLBLocationMap(FileStatus fSta, * @return partition map details (PartitionSpec and Partition) * @throws HiveException */ - public Map, Partition> loadDynamicPartitions(Path loadPath, - String tableName, Map partSpec, boolean replace, - int numDP, boolean listBucketingEnabled, boolean isAcid, long txnId, boolean hasFollowingStatsTask, - AcidUtils.Operation operation) + public Map, Partition> loadDynamicPartitions(final Path loadPath, + final String tableName, final Map partSpec, final boolean replace, + final int numDP, final boolean listBucketingEnabled, final boolean isAcid, final long txnId, + final boolean hasFollowingStatsTask, final AcidUtils.Operation operation) throws HiveException { Set validPartitions = new HashSet(); + int poolSize = Math.max(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), 1); + final ExecutorService pool = + Executors.newFixedThreadPool(poolSize, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("load-dynamic-partitions-%d") + .build()); try { - Map, Partition> partitionsMap = new - LinkedHashMap, Partition>(); + final Map, Partition> partitionsMap = + Collections.synchronizedMap(new LinkedHashMap, Partition>()); FileSystem fs = loadPath.getFileSystem(conf); FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); @@ -1723,7 +1744,7 @@ private void constructOneLBLocationMap(FileStatus fSta, validPartitions.add(s.getPath()); } - int partsToLoad = validPartitions.size(); + final int partsToLoad = validPartitions.size(); if (partsToLoad == 0) { LOG.warn("No partition is generated by dynamic partitioning"); } @@ -1736,36 +1757,73 @@ private void constructOneLBLocationMap(FileStatus fSta, + " to at least " + partsToLoad + '.'); } - Table tbl = getTable(tableName); + final Table tbl = getTable(tableName); + LOG.info("Getting partition details for spec : " + partSpec + " for table " + tableName); + List partitions = getPartitions(tbl, partSpec, (short) -1); + final Map tablePartitions = + Collections.synchronizedMap(new LinkedHashMap()); + for(Partition p : partitions) { + tablePartitions.put(p.getSpec().toString(), p); + } + // for each dynamically created DP directory, construct a full partition spec // and load the partition based on that Iterator iter = validPartitions.iterator(); LOG.info("Going to load " + partsToLoad + " partitions."); - PrintStream ps = null; - boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0 + final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0 && InPlaceUpdates.inPlaceEligible(conf); - if(inPlaceEligible) { - ps = SessionState.getConsole().getInfoStream(); - } - int partitionsLoaded = 0; + final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null; + final AtomicInteger partitionsLoaded = new AtomicInteger(0); + final List> futures = new LinkedList<>(); + final SessionState parentSession = SessionState.get(); while (iter.hasNext()) { // get the dynamically created directory - Path partPath = iter.next(); + final Path partPath = iter.next(); assert fs.getFileStatus(partPath).isDir(): "partitions " + partPath + " is not a directory !"; // generate a full partition specification - LinkedHashMap fullPartSpec = new LinkedHashMap(partSpec); + final LinkedHashMap fullPartSpec = + new LinkedHashMap(partSpec); Warehouse.makeSpecFromName(fullPartSpec, partPath); - Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace, - true, listBucketingEnabled, false, isAcid, hasFollowingStatsTask); - partitionsMap.put(fullPartSpec, newPartition); - if (inPlaceEligible) { - InPlaceUpdates.rePositionCursor(ps); - InPlaceUpdates.reprintLine(ps, "Loaded : " + ++partitionsLoaded + "/" + partsToLoad +" partitions."); - } - LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); + + futures.add(pool.submit(new Callable() { + @Override + public Boolean call() throws Exception { + try { + // move file would require seession details (needCopy() invokes SessionState.get) + SessionState.setCurrentSessionState(parentSession); + + // load the partition + Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, + tablePartitions.get(fullPartSpec), replace, + true, listBucketingEnabled, false, isAcid, hasFollowingStatsTask); + partitionsMap.put(fullPartSpec, newPartition); + + if (inPlaceEligible) { + synchronized (ps) { + InPlaceUpdates.rePositionCursor(ps); + partitionsLoaded.incrementAndGet(); + InPlaceUpdates.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/" + + partsToLoad + " partitions."); + } + } + LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); + return true; + } catch(Exception t) { + LOG.error("Exception when loading partition " + partPath, t); + throw t; + } + } + })); + } + pool.shutdown(); + LOG.debug("Number of partitions to be added is " + futures.size()); + + for(Future future : futures) { + future.get(); } + if (isAcid) { List partNames = new ArrayList<>(partitionsMap.size()); for (Partition p : partitionsMap.values()) { @@ -1779,6 +1837,10 @@ private void constructOneLBLocationMap(FileStatus fSta, throw new HiveException(e); } catch (TException te) { throw new HiveException(te); + } catch (InterruptedException e) { + throw new HiveException(e); + } catch (ExecutionException e) { + throw new HiveException(e); } }