diff --git ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java index e8f3623575..0372064fb3 100644 --- ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java +++ ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java @@ -87,6 +87,13 @@ public synchronized void alter_partition(String catName, String dbName, String t client.alter_partition(catName, dbName, tblName, newPart, environmentContext, writeIdList); } + public void alter_partitions(String catName, String dbName, String tblName, + List partitions, EnvironmentContext environmentContext, + String writeIdList, long writeId) throws TException { + client.alter_partitions(catName, dbName, tblName, partitions, environmentContext, writeIdList, + writeId); + } + public synchronized LockResponse checkLock(long lockid) throws TException { return client.checkLock(lockid); } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e185bf49d4..c8360fa61f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -57,6 +57,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -2140,9 +2141,7 @@ private Partition loadPartitionInternal(Path loadPath, Table tbl, Map partitions, List tableSnapshots) throws HiveException { try { + if (partitions.isEmpty() || tableSnapshots.isEmpty()) { + return; + } if (LOG.isDebugEnabled()) { StringBuffer debugMsg = new StringBuffer("Adding new partitions "); partitions.forEach(partition -> debugMsg.append(partition.getSpec() + " ")); LOG.debug(debugMsg.toString()); } - getSynchronizedMSC().add_partitions(partitions.parallelStream().map(Partition::getTPartition) + getSynchronizedMSC().add_partitions(partitions.stream().map(Partition::getTPartition) .collect(Collectors.toList())); } catch(AlreadyExistsException aee) { // With multiple users concurrently issuing insert statements on the same partition has @@ -2308,7 +2310,7 @@ private void listFilesCreatedByQuery(Path loadPath, long writeId, int stmtId, } private void setStatsPropAndAlterPartition(boolean hasFollowingStatsTask, Table tbl, - Partition newTPart, TableSnapshot tableSnapshot) throws MetaException, TException { + Partition newTPart, TableSnapshot tableSnapshot) throws TException { EnvironmentContext ec = new EnvironmentContext(); if (hasFollowingStatsTask) { ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); @@ -2319,6 +2321,27 @@ private void setStatsPropAndAlterPartition(boolean hasFollowingStatsTask, Table tableSnapshot == null ? null : tableSnapshot.getValidWriteIdList()); } + private void setStatsPropAndAlterPartitions(boolean hasFollowingStatsTask, Table tbl, + List partitions, + long writeId) throws TException { + if (partitions.isEmpty()) { + return; + } + EnvironmentContext ec = new EnvironmentContext(); + if (hasFollowingStatsTask) { + ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); + } + if (LOG.isDebugEnabled()) { + StringBuilder sb = new StringBuilder("Altering existing partitions "); + partitions.forEach(p -> sb.append(p.getSpec())); + LOG.debug(sb.toString()); + } + + getSynchronizedMSC().alter_partitions(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), + partitions.stream().map(Partition::getTPartition).collect(Collectors.toList()), + ec, null, writeId); + } + /** * Walk through sub-directory tree to construct list bucketing location map. * @@ -2553,7 +2576,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // Match valid partition path to partitions while (iterator.hasNext()) { Partition partition = iterator.next(); - partitionDetailsMap.entrySet().parallelStream() + partitionDetailsMap.entrySet().stream() .filter(entry -> entry.getValue().fullSpec.equals(partition.getSpec())) .findAny().ifPresent(entry -> { entry.getValue().partition = partition; @@ -2633,14 +2656,14 @@ private void constructOneLBLocationMap(FileStatus fSta, addPartitionsToMetastore( partitionDetailsMap.entrySet() - .parallelStream() + .stream() .filter(entry -> !entry.getValue().hasOldPartition) .map(entry -> entry.getValue().partition) .collect(Collectors.toList()), hasFollowingStatsTask, tbl, partitionDetailsMap.entrySet() - .parallelStream() + .stream() .filter(entry -> !entry.getValue().hasOldPartition) .map(entry -> entry.getValue().tableSnapshot) .collect(Collectors.toList())); @@ -2652,11 +2675,14 @@ private void constructOneLBLocationMap(FileStatus fSta, if (isTxnTable && partitionDetails.newFiles != null) { addWriteNotificationLog(tbl, partitionDetails.fullSpec, partitionDetails.newFiles, writeId); } - if (partitionDetails.hasOldPartition) { - setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, partitionDetails.partition, - partitionDetails.tableSnapshot); - } } + + setStatsPropAndAlterPartitions(hasFollowingStatsTask, tbl, + partitionDetailsMap.entrySet().stream() + .filter(entry -> entry.getValue().hasOldPartition) + .map(entry -> entry.getValue().partition) + .collect(Collectors.toList()), writeId); + } catch (InterruptedException | ExecutionException e) { throw new HiveException("Exception when loading " + validPartitions.size() + " in table " + tbl.getTableName() @@ -2687,7 +2713,7 @@ private void constructOneLBLocationMap(FileStatus fSta, try { if (isAcid) { List partNames = - result.values().parallelStream().map(Partition::getName).collect(Collectors.toList()); + result.values().stream().map(Partition::getName).collect(Collectors.toList()); getMSC().addDynamicPartitions(parentSession.getTxnMgr().getCurrentTxnId(), writeId, tbl.getDbName(), tbl.getTableName(), partNames, AcidUtils.toDataOperationType(operation));