diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index ccadac1ada6aaae884ab39f5d99e91b8c542404e..aca0ccb89189ed138f5f967d5bfd1945b2d0f7a6 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -57,6 +57,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -260,8 +261,21 @@ public void alterTable(RawStore msdb, Warehouse wh, String dbname, msdb.alterTable(dbname, name, newt); // alterPartition is only for changing the partition location in the table rename if (dataWasMoved) { - for (Partition part : parts) { - msdb.alterPartition(newDbName, newTblName, part.getValues(), part); + + int partsToProcess = parts.size(); + int partitionBatchSize = MetastoreConf.getIntVar(hiveConf, + MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX); + int batchStart = 0; + while (partsToProcess > 0) { + int batchEnd = Math.min(batchStart + partitionBatchSize, parts.size()); + List partBatch = parts.subList(batchStart, batchEnd); + partsToProcess -= partBatch.size(); + batchStart += partBatch.size(); + List> partValues = new LinkedList<>(); + for (Partition part : parts) { + partValues.add(part.getValues()); + } + msdb.alterPartitions(newDbName, newTblName, partValues, partBatch); } } diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 62801c53853dbafb7c425cff943ec819dcee4800..687a6fc01fafa2f40cabedf34b8f018c0637a8da 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -3572,7 +3572,9 @@ public void alterTable(String dbname, String name, Table newTable) oldt.setOwner(newt.getOwner()); // Fully copy over the contents of the new SD into the old SD, // so we don't create an extra SD in the metastore db that has no references. + MColumnDescriptor oldCD = oldt.getSd().getCD(); copyMSD(newt.getSd(), oldt.getSd()); + removeUnusedColumnDescriptor(oldCD); oldt.setRetention(newt.getRetention()); oldt.setPartitionKeys(newt.getPartitionKeys()); oldt.setTableType(newt.getTableType()); @@ -3621,12 +3623,23 @@ public void alterIndex(String dbname, String baseTblName, String name, Index new } } - private void alterPartitionNoTxn(String dbname, String name, List part_vals, + /** + * Alters an existing partition. Initiates copy of SD. Returns the old CD. + * @param dbname + * @param name + * @param part_vals Partition values (of the original partition instance) + * @param newPart Partition object containing new information + * @return The column descriptor of the old partition instance + * @throws InvalidObjectException + * @throws MetaException + */ + private MColumnDescriptor alterPartitionNoTxn(String dbname, String name, List part_vals, Partition newPart) throws InvalidObjectException, MetaException { name = normalizeIdentifier(name); dbname = normalizeIdentifier(dbname); MPartition oldp = getMPartition(dbname, name, part_vals); MPartition newp = convertToMPart(newPart, false); + MColumnDescriptor oldCD = oldp.getSd().getCD(); if (oldp == null || newp == null) { throw new InvalidObjectException("partition does not exist."); } @@ -3642,6 +3655,7 @@ private void alterPartitionNoTxn(String dbname, String name, List part_v if (newp.getLastAccessTime() != oldp.getLastAccessTime()) { oldp.setLastAccessTime(newp.getLastAccessTime()); } + return oldCD; } @Override @@ -3651,7 +3665,8 @@ public void alterPartition(String dbname, String name, List part_vals, P Exception e = null; try { openTransaction(); - alterPartitionNoTxn(dbname, name, part_vals, newPart); + MColumnDescriptor oldCd = alterPartitionNoTxn(dbname, name, part_vals, newPart); + removeUnusedColumnDescriptor(oldCd); // commit the changes success = commitTransaction(); } catch (Exception exception) { @@ -3677,9 +3692,14 @@ public void alterPartitions(String dbname, String name, List> part_ try { openTransaction(); Iterator> part_val_itr = part_vals.iterator(); + Set oldCds = new HashSet<>(); for (Partition tmpPart: newParts) { List tmpPartVals = part_val_itr.next(); - alterPartitionNoTxn(dbname, name, tmpPartVals, tmpPart); + MColumnDescriptor oldCd = alterPartitionNoTxn(dbname, name, tmpPartVals, tmpPart); + oldCds.add(oldCd); + } + for (MColumnDescriptor oldCd : oldCds) { + removeUnusedColumnDescriptor(oldCd); } // commit the changes success = commitTransaction(); @@ -3700,7 +3720,6 @@ public void alterPartitions(String dbname, String name, List> part_ private void copyMSD(MStorageDescriptor newSd, MStorageDescriptor oldSd) { oldSd.setLocation(newSd.getLocation()); - MColumnDescriptor oldCD = oldSd.getCD(); // If the columns of the old column descriptor != the columns of the new one, // then change the old storage descriptor's column descriptor. // Convert the MFieldSchema's to their thrift object counterparts, because we maintain @@ -3716,9 +3735,6 @@ private void copyMSD(MStorageDescriptor newSd, MStorageDescriptor oldSd) { oldSd.setCD(newSd.getCD()); } - //If oldCd does not have any more references, then we should delete it - // from the backend db - removeUnusedColumnDescriptor(oldCD); oldSd.setBucketCols(newSd.getBucketCols()); oldSd.setCompressed(newSd.isCompressed()); oldSd.setInputFormat(newSd.getInputFormat());