diff --git ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java index e8f3623575..fd2ce3ec7a 100644 --- ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java +++ ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java @@ -87,6 +87,13 @@ public synchronized void alter_partition(String catName, String dbName, String t client.alter_partition(catName, dbName, tblName, newPart, environmentContext, writeIdList); } + public void alter_partitions(String catName, String dbName, String tblName, + List partitions, EnvironmentContext environmentContext, + String writeIdList, long writeId) throws TException { + client.alter_partitions(catName, dbName, tblName, partitions, environmentContext, writeIdList, + writeId); + } + public synchronized LockResponse checkLock(long lockid) throws TException { return client.checkLock(lockid); } @@ -129,4 +136,5 @@ public boolean isSameConfObj(Configuration c) { public boolean isCompatibleWith(Configuration c) { return client.isCompatibleWith(c); } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e185bf49d4..d24e10650e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2084,8 +2084,10 @@ private Partition loadPartitionInternal(Path loadPath, Table tbl, Map partitions, List tableSnapshots) throws HiveException { try { + if (partitions.isEmpty() || tableSnapshots.isEmpty()) { + return; + } if (LOG.isDebugEnabled()) { StringBuffer debugMsg = new StringBuffer("Adding new partitions "); partitions.forEach(partition -> debugMsg.append(partition.getSpec() + " ")); @@ -2201,7 +2208,9 @@ private void addPartitionsToMetastore(List partitions, // insert into table T partition (ds) values ('Bob', 'today'); -- creates the partition 'today' // insert into table T partition (ds) values ('Joe', 'today'); -- will fail with AlreadyExistsException // In that case, we want to retry with alterPartition. - LOG.debug("Caught AlreadyExistsException, trying to add partitions one by one."); + if (LOG.isDebugEnabled()) { + LOG.debug("Caught AlreadyExistsException, trying to add partitions one by one."); + } assert partitions.size() == tableSnapshots.size(); for (int i = 0; i < partitions.size(); i++) { addPartitionToMetastore(partitions.get(i), hasFollowingStatsTask, tbl, @@ -2308,17 +2317,40 @@ private void listFilesCreatedByQuery(Path loadPath, long writeId, int stmtId, } private void setStatsPropAndAlterPartition(boolean hasFollowingStatsTask, Table tbl, - Partition newTPart, TableSnapshot tableSnapshot) throws MetaException, TException { + Partition newTPart, TableSnapshot tableSnapshot) throws TException { EnvironmentContext ec = new EnvironmentContext(); if (hasFollowingStatsTask) { ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); } - LOG.debug("Altering existing partition " + newTPart.getSpec()); + if (LOG.isDebugEnabled()) { + LOG.debug("Altering existing partition " + newTPart.getSpec()); + } getSynchronizedMSC().alter_partition(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), newTPart.getTPartition(), new EnvironmentContext(), tableSnapshot == null ? null : tableSnapshot.getValidWriteIdList()); } + private void setStatsPropAndAlterPartitions(boolean hasFollowingStatsTask, Table tbl, + List partitions, TableSnapshot tableSnapshot, + long writeId) throws TException { + if (partitions.isEmpty()) { + return; + } + EnvironmentContext ec = new EnvironmentContext(); + if (hasFollowingStatsTask) { + ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); + } + if (LOG.isDebugEnabled()) { + StringBuilder sb = new StringBuilder("Altering existing partitions "); + partitions.forEach(p -> sb.append(p.getSpec())); + LOG.debug(sb.toString()); + } + + getSynchronizedMSC().alter_partitions(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), + partitions.parallelStream().map(Partition::getTPartition).collect(Collectors.toList()), + ec, tableSnapshot.getValidWriteIdList(), writeId); + } + /** * Walk through sub-directory tree to construct list bucketing location map. * @@ -2624,7 +2656,9 @@ private void constructOneLBLocationMap(FileStatus fSta, Map, Partition> result = Maps.newLinkedHashMap(); try { futures = executor.invokeAll(tasks); - LOG.debug("Number of partitionsToAdd to be added is " + futures.size()); + if (LOG.isDebugEnabled()) { + LOG.debug("Number of partitionsToAdd to be added is " + futures.size()); + } for (Future future : futures) { Partition partition = future.get(); result.put(partition.getSpec(), partition); @@ -2652,11 +2686,15 @@ private void constructOneLBLocationMap(FileStatus fSta, if (isTxnTable && partitionDetails.newFiles != null) { addWriteNotificationLog(tbl, partitionDetails.fullSpec, partitionDetails.newFiles, writeId); } - if (partitionDetails.hasOldPartition) { - setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, partitionDetails.partition, - partitionDetails.tableSnapshot); - } } + + setStatsPropAndAlterPartitions(hasFollowingStatsTask, tbl, + partitionDetailsMap.entrySet().parallelStream() + .filter(entry -> entry.getValue().hasOldPartition) + .map(entry -> entry.getValue().partition) + .collect(Collectors.toList()), + AcidUtils.getTableSnapshot(conf, tbl, true), + writeId); } catch (InterruptedException | ExecutionException e) { throw new HiveException("Exception when loading " + validPartitions.size() + " in table " + tbl.getTableName() @@ -2680,7 +2718,9 @@ private void constructOneLBLocationMap(FileStatus fSta, LOG.error(logMsg.toString(), e); throw e; } finally { - LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks"); + if (LOG.isDebugEnabled()) { + LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks"); + } executor.shutdownNow(); }