diff --git ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java index e8f3623575..300b4256a3 100644 --- ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java +++ ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java @@ -87,6 +87,13 @@ public synchronized void alter_partition(String catName, String dbName, String t client.alter_partition(catName, dbName, tblName, newPart, environmentContext, writeIdList); } + public synchronized void alter_partitions(String catName, String dbName, String tblName, + List newParts, EnvironmentContext environmentContext, + String writeIdList, long writeId) throws TException { + client.alter_partitions(catName, dbName, tblName, newParts, environmentContext, writeIdList, + writeId); + } + public synchronized LockResponse checkLock(long lockid) throws TException { return client.checkLock(lockid); } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 180b41e044..1e0554b360 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2294,12 +2294,33 @@ private void setStatsPropAndAlterPartition(boolean hasFollowingStatsTask, Table if (hasFollowingStatsTask) { ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); } - LOG.debug("Altering existing partition " + newTPart.getSpec()); + if (LOG.isDebugEnabled()) { + LOG.debug("Altering existing partition " + newTPart.getSpec()); + } getSynchronizedMSC().alter_partition(tbl.getCatName(), - tbl.getDbName(), tbl.getTableName(), newTPart.getTPartition(), new EnvironmentContext(), + tbl.getDbName(), tbl.getTableName(), newTPart.getTPartition(), ec, tableSnapshot == null ? null : tableSnapshot.getValidWriteIdList()); } + private void setStatsPropAndAlterPartitions(boolean hasFollowingStatsTask, Table tbl, + List partitions, long writeId) throws TException { + EnvironmentContext ec = new EnvironmentContext(); + if (hasFollowingStatsTask) { + ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); + } + if (LOG.isDebugEnabled()) { + StringBuilder sb = new StringBuilder("Altering existing partitions "); + partitions.forEach(p -> sb.append(p.getSpec())); + LOG.debug(sb.toString()); + } + + getSynchronizedMSC().alter_partitions(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), + partitions.parallelStream(). + map(Partition::getTPartition). + collect(Collectors.toList()), + ec, null, writeId); + } + /** * Walk through sub-directory tree to construct list bucketing location map. * @@ -2627,17 +2648,18 @@ private void constructOneLBLocationMap(FileStatus fSta, .collect(Collectors.toList())); // For acid table, add the acid_write event with file list at the time of load itself. But // it should be done after partition is created. - for (Entry entry : partitionDetailsMap.entrySet()) { PartitionDetails partitionDetails = entry.getValue(); if (isTxnTable && partitionDetails.newFiles != null) { addWriteNotificationLog(tbl, partitionDetails.fullSpec, partitionDetails.newFiles, writeId); } - if (partitionDetails.hasOldPartition) { - setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, partitionDetails.partition, - partitionDetails.tableSnapshot); - } } + setStatsPropAndAlterPartitions(hasFollowingStatsTask, tbl, + partitionDetailsMap.entrySet().parallelStream() + .filter(entry -> entry.getValue().hasOldPartition) + .map(entry -> entry.getValue().partition) + .collect(Collectors.toList()) + , writeId); } catch (InterruptedException | ExecutionException e) { throw new HiveException("Exception when loading " + validPartitions.size() + " in table " + tbl.getTableName()