Index: server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (revision 1438218) +++ server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (working copy) @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -108,15 +109,15 @@ // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" if (partitionEvent.getStatus()) { - Partition partition = partitionEvent.getPartition(); - String topicName = getTopicName(partition, partitionEvent); + List partitions = partitionEvent.getPartitions(); + String topicName = getTopicName(partitions.get(0), partitionEvent); if (topicName != null && !topicName.equals("")) { - send(messageFactory.buildAddPartitionMessage(partitionEvent.getTable(), partition), topicName); + send(messageFactory.buildAddPartitionMessage(partitionEvent.getTable(), partitions), topicName); } else { LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " - + partition.getDbName() + + partitions.get(0).getDbName() + "." - + partition.getTableName() + + partitions.get(0).getTableName() + " To enable notifications for this table, please do alter table set properties (" + HCatConstants.HCAT_MSGBUS_TOPIC_NAME + "=.) or whatever you want topic name to be."); Index: server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java (revision 1438218) +++ server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java (working copy) @@ -31,8 +31,10 @@ import org.apache.hcatalog.messaging.MessageDeserializer; import org.apache.hcatalog.messaging.MessageFactory; +import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; /** @@ -90,6 +92,12 @@ } @Override + public AddPartitionMessage buildAddPartitionMessage(Table table, List partitions) { + return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), + table.getTableName(), getPartitionKeyValues(table, partitions), System.currentTimeMillis()/1000); + } + + @Override public DropPartitionMessage buildDropPartitionMessage(Table table, Partition partition) { return new JSONDropPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, partition.getDbName(), partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)), @@ -103,4 +111,11 @@ partition.getValues().get(i)); return partitionKeys; } + + private static List> getPartitionKeyValues(Table table, List partitions) { + List> partitionKeyValueList = new ArrayList>(partitions.size()); + for (Partition partition : partitions) + partitionKeyValueList.add(getPartitionKeyValues(table, partition)); + return partitionKeyValueList; + } } Index: server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java (revision 1438218) +++ server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java (working copy) @@ -26,6 +26,8 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hcatalog.messaging.json.JSONMessageFactory; +import java.util.List; + /** * Abstract Factory for the construction of HCatalog message instances. */ @@ -129,6 +131,14 @@ public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Partition partition); /** + * Factory method for AddPartitionMessage. + * @param table The Table to which the partition is added. + * @param partitions The List of Partitions being added. + * @return AddPartitionMessage instance. + */ + public abstract AddPartitionMessage buildAddPartitionMessage(Table table, List partitions); + + /** * Factory method for DropPartitionMessage. * @param table The Table from which the partition is dropped. * @param partition The Partition being dropped.