diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 1ec00ab..8c4d28b 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -44,7 +44,6 @@ import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.messaging.MessageFactory; -import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -162,7 +161,7 @@ public void onAddPartition (AddPartitionEvent partitionEvent) Table t = partitionEvent.getTable(); NotificationEvent event = new NotificationEvent(0, now(), HCatConstants.HCAT_ADD_PARTITION_EVENT, - msgFactory.buildAddPartitionMessage(t, partitionEvent.getPartitions()).toString()); + msgFactory.buildAddPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); enqueue(event); diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java index 24f2c38..ea5a205 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import javax.jms.Connection; @@ -130,15 +129,14 @@ public void onAddPartition(AddPartitionEvent partitionEvent) // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" if (partitionEvent.getStatus()) { Table table = partitionEvent.getTable(); - List partitions = partitionEvent.getPartitions(); String topicName = getTopicName(table); if (topicName != null && !topicName.equals("")) { - send(messageFactory.buildAddPartitionMessage(table, partitions), topicName); + send(messageFactory.buildAddPartitionMessage(table, partitionEvent.getPartitionIterator()), topicName); } else { LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " - + partitions.get(0).getDbName() + + partitionEvent.getTable().getDbName() + "." - + partitions.get(0).getTableName() + + partitionEvent.getTable().getTableName() + " To enable notifications for this table, please do alter table set properties (" + HCatConstants.HCAT_MSGBUS_TOPIC_NAME + "=.) or whatever you want topic name to be."); diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java index 88df982..93b2abb 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java @@ -19,16 +19,14 @@ package org.apache.hive.hcatalog.messaging; -import org.apache.hadoop.hive.common.classification.InterfaceAudience; -import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.hcatalog.messaging.json.JSONMessageFactory; +import java.util.Iterator; import java.util.List; /** @@ -142,17 +140,7 @@ public static MessageDeserializer getDeserializer(String format, * @param partitions The set of Partitions being added. * @return AddPartitionMessage instance. */ - public abstract AddPartitionMessage buildAddPartitionMessage(Table table, List partitions); - - /** - * Factory method for AddPartitionMessage. - * @param table The Table to which the partitions are added. - * @param partitionSpec The set of Partitions being added. - * @return AddPartitionMessage instance. - */ - @InterfaceAudience.LimitedPrivate({"Hive"}) - @InterfaceStability.Evolving - public abstract AddPartitionMessage buildAddPartitionMessage(Table table, PartitionSpecProxy partitionSpec); + public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partitions); /** * Factory method for building AlterPartitionMessage diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java index 0232f58..6dd14f0 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java @@ -19,14 +19,14 @@ package org.apache.hive.hcatalog.messaging.json; +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.common.classification.InterfaceAudience; -import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hive.hcatalog.messaging.AddPartitionMessage; import org.apache.hive.hcatalog.messaging.AlterPartitionMessage; import org.apache.hive.hcatalog.messaging.AlterTableMessage; @@ -39,6 +39,7 @@ import org.apache.hive.hcatalog.messaging.MessageDeserializer; import org.apache.hive.hcatalog.messaging.MessageFactory; +import javax.annotation.Nullable; import java.util.*; /** @@ -98,17 +99,9 @@ public DropTableMessage buildDropTableMessage(Table table) { } @Override - public AddPartitionMessage buildAddPartitionMessage(Table table, List partitions) { + public AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partitionsIterator) { return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), - table.getTableName(), getPartitionKeyValues(table, partitions), now()); - } - - @Override - @InterfaceAudience.LimitedPrivate({"Hive"}) - @InterfaceStability.Evolving - public AddPartitionMessage buildAddPartitionMessage(Table table, PartitionSpecProxy partitionSpec) { - return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), - table.getTableName(), getPartitionKeyValues(table, partitionSpec), now()); + table.getTableName(), getPartitionKeyValues(table, partitionsIterator), now()); } @Override @@ -142,22 +135,12 @@ private long now() { return partitionKeys; } - private static List> getPartitionKeyValues(Table table, List partitions) { - List> partitionList = new ArrayList>(partitions.size()); - for (Partition partition : partitions) - partitionList.add(getPartitionKeyValues(table, partition)); - return partitionList; - } - - @InterfaceAudience.LimitedPrivate({"Hive"}) - @InterfaceStability.Evolving - private static List> getPartitionKeyValues(Table table, PartitionSpecProxy partitionSpec) { - List> partitionList = new ArrayList>(); - PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); - while (iterator.hasNext()) { - Partition partition = iterator.next(); - partitionList.add(getPartitionKeyValues(table, partition)); - } - return partitionList; + private static List> getPartitionKeyValues(final Table table, Iterator iterator) { + return Lists.newArrayList(Iterators.transform(iterator, new Function>() { + @Override + public Map apply(@Nullable Partition partition) { + return getPartitionKeyValues(table,partition); + } + })); } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java index 5b9c350..53906ed 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java @@ -62,17 +62,14 @@ public Table getTable() { } /** - * @return List of partitions. - */ - public List getPartitions() { - return partitions; - } - - /** * @return Iterator for partitions. */ public Iterator getPartitionIterator() { - return partitionSpecProxy == null ? null : partitionSpecProxy.getPartitionIterator(); + if (partitions != null){ + return partitions.iterator(); + } else { + return partitionSpecProxy == null ? null : partitionSpecProxy.getPartitionIterator(); + } } }