Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision 1478858) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision ) @@ -1613,23 +1613,23 @@ private int add_partitions_core(final RawStore ms, final List parts) throws MetaException, InvalidObjectException, AlreadyExistsException { - String db = parts.get(0).getDbName(); - String tblName = parts.get(0).getTableName(); - logInfo("add_partitions : db=" + db + " tbl=" + tblName); boolean success = false; Map addedPartitions = new HashMap(); + Map> tableToPartitionsMap = new HashMap>(); try { ms.openTransaction(); + firePreEvent(new PreAddPartitionEvent(parts, this)); for (Partition part : parts) { - // No environment context is passed. - Entry e = add_partition_core_notxn(ms, part, null); + Entry e = add_partition_core_notxn_no_events(ms, part, tableToPartitionsMap); addedPartitions.put(e.getKey(), e.getValue()); } success = ms.commitTransaction(); } finally { if (!success) { ms.rollbackTransaction(); + // Remove directories created for successfully added partitions, added prior. + // The partition that couldn't be added has already been cleaned up. for (Entry e : addedPartitions.entrySet()) { if (e.getValue()) { wh.deleteDir(new Path(e.getKey().getSd().getLocation()), true); @@ -1637,10 +1637,9 @@ } } } - for (Partition part : parts) { - fireMetaStoreAddPartitionEvent(ms, part, null, success); + // No environment context is passed. + fireAddPartitionEvent(new AddPartitionEvent(tableToPartitionsMap, success, this)); - } + } - } return parts.size(); } @@ -1679,26 +1678,25 @@ * An implementation of add_partition_core that does not commit * transaction or rollback transaction as part of its operation * - it is assumed that will be tended to from outside this call - * + * This implementation doesn't send partition-events either. * @param ms * @param part - * @param envContext - * parameters passed by the client * @return * @throws InvalidObjectException * @throws AlreadyExistsException * @throws MetaException */ - private Entry add_partition_core_notxn( - final RawStore ms, final Partition part, - final EnvironmentContext envContext) + private Entry add_partition_core_notxn_no_events( + final RawStore ms, final Partition part, Map> tableToPartitionMap) throws InvalidObjectException, AlreadyExistsException, MetaException { + String db = part.getDbName(); + String tblName = part.getTableName(); + logInfo("add_partitions : db=" + db + " tbl=" + tblName); + boolean success = false, madeDir = false; Path partLocation = null; Table tbl = null; try { - firePreEvent(new PreAddPartitionEvent(part, this)); - MetaStoreUtils.validatePartitionNameCharacters(part.getValues(), partitionValidationPattern); @@ -1781,7 +1779,9 @@ } } - success = ms.addPartition(part); + // Convert addPartition-failure to exception, to make failure conditions uniform. + if (!(success = ms.addPartition(part))) + throw new MetaException("Could not add partition: " + part); } finally { if (!success) { @@ -1789,20 +1789,33 @@ wh.deleteDir(partLocation, true); } } + else { + addPartitionToList(tbl, part, tableToPartitionMap); - } + } + } Map returnVal = new HashMap(); returnVal.put(part, madeDir); return returnVal.entrySet().iterator().next(); } + private static void addPartitionToList(Table table, Partition partition, + Map> tableToPartitionMap) { + if (!tableToPartitionMap.containsKey(table)) + tableToPartitionMap.put(table, new ArrayList()); + + tableToPartitionMap.get(table).add(partition); + } + private Partition add_partition_core(final RawStore ms, final Partition part, final EnvironmentContext envContext) throws InvalidObjectException, AlreadyExistsException, MetaException { boolean success = false; Partition retPtn = null; + Map> tableToPartitionsMap = new HashMap>(1); try { + firePreEvent(new PreAddPartitionEvent(part, this)); ms.openTransaction(); - retPtn = add_partition_core_notxn(ms, part, envContext).getKey(); + retPtn = add_partition_core_notxn_no_events(ms, part, tableToPartitionsMap).getKey(); // we proceed only if we'd actually succeeded anyway, otherwise, // we'd have thrown an exception success = ms.commitTransaction(); @@ -1810,20 +1823,22 @@ if (!success) { ms.rollbackTransaction(); } - fireMetaStoreAddPartitionEvent(ms, part, envContext, success); + fireMetaStoreAddPartitionEvent(tableToPartitionsMap, envContext, success); } return retPtn; } - private void fireMetaStoreAddPartitionEvent(final RawStore ms, - final Partition part, final EnvironmentContext envContext, boolean success) + private void fireMetaStoreAddPartitionEvent(Map> tableToPartitionsList, + final EnvironmentContext envContext, boolean success) throws MetaException { - final Table tbl = ms.getTable(part.getDbName(), part.getTableName()); - for (MetaStoreEventListener listener : listeners) { - AddPartitionEvent addPartitionEvent = - new AddPartitionEvent(tbl, part, success, this); + AddPartitionEvent addPartitionEvent = new AddPartitionEvent(tableToPartitionsList, success, this); - addPartitionEvent.setEnvironmentContext(envContext); + addPartitionEvent.setEnvironmentContext(envContext); - listener.onAddPartition(addPartitionEvent); + fireAddPartitionEvent(addPartitionEvent); + } + + private void fireAddPartitionEvent(AddPartitionEvent event) throws MetaException { + for (MetaStoreEventListener listener : listeners) { + listener.onAddPartition(event); } } Index: metastore/src/java/org/apache/hadoop/hive/metastore/events/PreAddPartitionEvent.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/events/PreAddPartitionEvent.java (revision 1478858) +++ metastore/src/java/org/apache/hadoop/hive/metastore/events/PreAddPartitionEvent.java (revision ) @@ -21,19 +21,26 @@ import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.api.Partition; +import java.util.Arrays; +import java.util.List; + public class PreAddPartitionEvent extends PreEventContext { - private final Partition partition; + private final List partitions; - public PreAddPartitionEvent (Partition partition, HMSHandler handler) { + public PreAddPartitionEvent (List partitions, HMSHandler handler) { super(PreEventType.ADD_PARTITION, handler); - this.partition = partition; + this.partitions = partitions; } + public PreAddPartitionEvent(Partition partition, HMSHandler handler) { + this(Arrays.asList(partition), handler); + } + /** - * @return the partition + * @return the partitions */ - public Partition getPartition() { - return partition; + public List getPartitions() { + return partitions; } } Index: metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java (revision 1478858) +++ metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java (revision ) @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.metastore; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -223,12 +224,30 @@ AddPartitionEvent partEvent = (AddPartitionEvent)(notifyList.get(listSize-1)); assert partEvent.getStatus(); - validateAddPartition(part, partEvent.getPartition()); - validateTableInAddPartition(tbl, partEvent.getTable()); + validateAddPartition(part, partEvent.getTableToPartitionsMap().get(tbl).get(0)); + validateTableInAddPartition(tbl, partEvent.getTableToPartitionsMap().entrySet().iterator().next().getKey()); PreAddPartitionEvent prePartEvent = (PreAddPartitionEvent)(preNotifyList.get(listSize-1)); - validateAddPartition(part, prePartEvent.getPartition()); + validateAddPartition(part, prePartEvent.getPartitions().get(0)); + // Test adding multiple partitions in a single partition-set, atomically. + int currentTime = (int)System.currentTimeMillis(); + HiveMetaStoreClient hmsClient = new HiveMetaStoreClient(hiveConf); + Table table = hmsClient.getTable(dbName, "tmptbl"); + Partition partition1 = new Partition(Arrays.asList("20110101"), dbName, "tmptbl", currentTime, + currentTime, table.getSd(), table.getParameters()); + Partition partition2 = new Partition(Arrays.asList("20110102"), dbName, "tmptbl", currentTime, + currentTime, table.getSd(), table.getParameters()); + Partition partition3 = new Partition(Arrays.asList("20110103"), dbName, "tmptbl", currentTime, + currentTime, table.getSd(), table.getParameters()); + hmsClient.add_partitions(Arrays.asList(partition1, partition2, partition3)); + ++listSize; + AddPartitionEvent multiplePartitionEvent = (AddPartitionEvent)(notifyList.get(listSize-1)); + assertEquals("Unexpected number of partitions in event!", 3, multiplePartitionEvent.getTableToPartitionsMap().get(table).size()); + assertEquals("Unexpected partition value.", partition1.getValues(), multiplePartitionEvent.getTableToPartitionsMap().get(table).get(0).getValues()); + assertEquals("Unexpected partition value.", partition2.getValues(), multiplePartitionEvent.getTableToPartitionsMap().get(table).get(1).getValues()); + assertEquals("Unexpected partition value.", partition3.getValues(), multiplePartitionEvent.getTableToPartitionsMap().get(table).get(2).getValues()); + driver.run(String.format("alter table %s touch partition (%s)", tblName, "b='2011'")); listSize++; assertEquals(notifyList.size(), listSize); @@ -260,11 +279,11 @@ AddPartitionEvent appendPartEvent = (AddPartitionEvent)(notifyList.get(listSize-1)); - validateAddPartition(newPart, appendPartEvent.getPartition()); + validateAddPartition(newPart, appendPartEvent.getTableToPartitionsMap().entrySet().iterator().next().getValue().get(0)); PreAddPartitionEvent preAppendPartEvent = (PreAddPartitionEvent)(preNotifyList.get(listSize-1)); - validateAddPartition(newPart, preAppendPartEvent.getPartition()); + validateAddPartition(newPart, preAppendPartEvent.getPartitions().get(0)); driver.run(String.format("alter table %s rename to %s", tblName, renamed)); listSize++; Index: metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java (revision 1478858) +++ metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java (revision ) @@ -22,28 +22,37 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class AddPartitionEvent extends ListenerEvent { - private final Table table; - private final Partition partition; + private Map> tableToPartitionsMap; - public AddPartitionEvent (Table table, Partition partition, boolean status, HMSHandler handler) { + public AddPartitionEvent(Map> tableToPartitionsMap, boolean status, HMSHandler handler) { - super (status, handler); + super(status, handler); - this.table = table; - this.partition = partition; + this.tableToPartitionsMap = tableToPartitionsMap; } - /** - * @return the partition - */ - public Partition getPartition() { - return partition; + public AddPartitionEvent(Table table, Partition partition, boolean status, HMSHandler handler) { + this(getTableToPartitionsMap(table, partition), status, handler); } /** - * @return the table + * Getter for mapping between tables and partitions added to those tables. + * @return The table-to-partitions map. */ - public Table getTable() { - return table; + public Map> getTableToPartitionsMap() { + return tableToPartitionsMap; + } + + private static Map> getTableToPartitionsMap(Table table, Partition partition) { + Map> tableToPartitionsMap = new HashMap>(); + tableToPartitionsMap.put(table, new ArrayList(Arrays.asList(partition))); + + return tableToPartitionsMap; } } Index: ql/src/java/org/apache/hadoop/hive/ql/security/authorization/AuthorizationPreEventListener.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/security/authorization/AuthorizationPreEventListener.java (revision 1478858) +++ ql/src/java/org/apache/hadoop/hive/ql/security/authorization/AuthorizationPreEventListener.java (revision ) @@ -181,10 +181,11 @@ private void authorizeAddPartition(PreAddPartitionEvent context) throws InvalidOperationException, MetaException { try { - org.apache.hadoop.hive.metastore.api.Partition mapiPart = context.getPartition(); + for (org.apache.hadoop.hive.metastore.api.Partition mapiPart : context.getPartitions()) { - authorizer.authorize(getPartitionFromApiPartition(mapiPart, context), - HiveOperation.ALTERTABLE_ADDPARTS.getInputRequiredPrivileges(), - HiveOperation.ALTERTABLE_ADDPARTS.getOutputRequiredPrivileges()); + authorizer.authorize(getPartitionFromApiPartition(mapiPart, context), + HiveOperation.ALTERTABLE_ADDPARTS.getInputRequiredPrivileges(), + HiveOperation.ALTERTABLE_ADDPARTS.getOutputRequiredPrivileges()); + } } catch (AuthorizationException e) { throw invalidOperationException(e); } catch (NoSuchObjectException e) { Index: hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java (revision 1478858) +++ hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java (revision ) @@ -26,6 +26,8 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hcatalog.messaging.json.JSONMessageFactory; +import java.util.List; + /** * Abstract Factory for the construction of HCatalog message instances. */ @@ -123,10 +125,10 @@ /** * Factory method for AddPartitionMessage. * @param table The Table to which the partition is added. - * @param partition The Partition being added. + * @param partitions The set of Partitions being added. * @return AddPartitionMessage instance. */ - public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Partition partition); + public abstract AddPartitionMessage buildAddPartitionMessage(Table table, List partitions); /** * Factory method for DropPartitionMessage. Index: hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (revision 1478858) +++ hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (revision ) @@ -21,6 +21,8 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.List; +import java.util.Map; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -108,18 +110,23 @@ // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" if (partitionEvent.getStatus()) { - Partition partition = partitionEvent.getPartition(); - String topicName = getTopicName(partition, partitionEvent); + Map> tableToPartitionsMap = partitionEvent.getTableToPartitionsMap(); + + for (Map.Entry> tableAndPartitionsPair : tableToPartitionsMap.entrySet()) { + Table table = tableAndPartitionsPair.getKey(); + List partitions = tableAndPartitionsPair.getValue(); + String topicName = getTopicName(partitions.get(0), partitionEvent); - if (topicName != null && !topicName.equals("")) { + if (topicName != null && !topicName.equals("")) { - send(messageFactory.buildAddPartitionMessage(partitionEvent.getTable(), partition), topicName); + send(messageFactory.buildAddPartitionMessage(table, partitions), topicName); - } else { - LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + } else { + LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " - + partition.getDbName() + + partitions.get(0).getDbName() - + "." + + "." - + partition.getTableName() + + partitions.get(0).getTableName() - + " To enable notifications for this table, please do alter table set properties (" - + HCatConstants.HCAT_MSGBUS_TOPIC_NAME - + "=.) or whatever you want topic name to be."); + + " To enable notifications for this table, please do alter table set properties (" + + HCatConstants.HCAT_MSGBUS_TOPIC_NAME + + "=.) or whatever you want topic name to be."); + } } } Index: hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java (revision 1478858) +++ hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java (revision ) @@ -31,8 +31,10 @@ import org.apache.hcatalog.messaging.MessageDeserializer; import org.apache.hcatalog.messaging.MessageFactory; +import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; /** @@ -83,10 +85,9 @@ } @Override - public AddPartitionMessage buildAddPartitionMessage(Table table, Partition partition) { - return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, partition.getDbName(), - partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)), - System.currentTimeMillis()/1000); + public AddPartitionMessage buildAddPartitionMessage(Table table, List partitions) { + return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), + table.getTableName(), getPartitionKeyValues(table, partitions), System.currentTimeMillis()/1000); } @Override @@ -102,5 +103,12 @@ partitionKeys.put(table.getPartitionKeys().get(i).getName(), partition.getValues().get(i)); return partitionKeys; + } + + private static List> getPartitionKeyValues(Table table, List partitions) { + List> partitionList = new ArrayList>(partitions.size()); + for (Partition partition : partitions) + partitionList.add(getPartitionKeyValues(table, partition)); + return partitionList; } }