diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 1ec00ab..8c4d28b 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -44,7 +44,6 @@ import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.messaging.MessageFactory; -import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -162,7 +161,7 @@ public void onAddPartition (AddPartitionEvent partitionEvent) Table t = partitionEvent.getTable(); NotificationEvent event = new NotificationEvent(0, now(), HCatConstants.HCAT_ADD_PARTITION_EVENT, - msgFactory.buildAddPartitionMessage(t, partitionEvent.getPartitions()).toString()); + msgFactory.buildAddPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); enqueue(event); diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java index 24f2c38..1718d79 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java @@ -21,8 +21,6 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.List; -import java.util.Map; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -130,15 +128,14 @@ public void onAddPartition(AddPartitionEvent partitionEvent) // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" if (partitionEvent.getStatus()) { Table table = partitionEvent.getTable(); - List partitions = partitionEvent.getPartitions(); String topicName = getTopicName(table); if (topicName != null && !topicName.equals("")) { - send(messageFactory.buildAddPartitionMessage(table, partitions), topicName); + send(messageFactory.buildAddPartitionMessage(table, partitionEvent.getPartitionIterator()), topicName); } else { LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " - + partitions.get(0).getDbName() + + partitionEvent.getTable().getDbName() + "." - + partitions.get(0).getTableName() + + partitionEvent.getTable().getTableName() + " To enable notifications for this table, please do alter table set properties (" + HCatConstants.HCAT_MSGBUS_TOPIC_NAME + "=.) or whatever you want topic name to be."); diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java index 5ad1d68..2b16745 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java @@ -20,16 +20,14 @@ package org.apache.hive.hcatalog.messaging; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.classification.InterfaceAudience; -import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.hcatalog.messaging.json.JSONMessageFactory; +import java.util.Iterator; import java.util.List; /** @@ -140,20 +138,10 @@ public static MessageDeserializer getDeserializer(String format, /** * Factory method for AddPartitionMessage. * @param table The Table to which the partitions are added. - * @param partitions The set of Partitions being added. + * @param partitions The iterator to set of Partitions being added. * @return AddPartitionMessage instance. */ - public abstract AddPartitionMessage buildAddPartitionMessage(Table table, List partitions); - - /** - * Factory method for AddPartitionMessage. - * @param table The Table to which the partitions are added. - * @param partitionSpec The set of Partitions being added. - * @return AddPartitionMessage instance. - */ - @InterfaceAudience.LimitedPrivate({"Hive"}) - @InterfaceStability.Evolving - public abstract AddPartitionMessage buildAddPartitionMessage(Table table, PartitionSpecProxy partitionSpec); + public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partitions); /** * Factory method for building AlterPartitionMessage diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java index 0232f58..06efb89 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java @@ -19,14 +19,14 @@ package org.apache.hive.hcatalog.messaging.json; +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.common.classification.InterfaceAudience; -import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hive.hcatalog.messaging.AddPartitionMessage; import org.apache.hive.hcatalog.messaging.AlterPartitionMessage; import org.apache.hive.hcatalog.messaging.AlterTableMessage; @@ -39,7 +39,12 @@ import org.apache.hive.hcatalog.messaging.MessageDeserializer; import org.apache.hive.hcatalog.messaging.MessageFactory; -import java.util.*; +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; /** * The JSON implementation of the MessageFactory. Constructs JSON implementations of @@ -98,17 +103,9 @@ public DropTableMessage buildDropTableMessage(Table table) { } @Override - public AddPartitionMessage buildAddPartitionMessage(Table table, List partitions) { + public AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partitionsIterator) { return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), - table.getTableName(), getPartitionKeyValues(table, partitions), now()); - } - - @Override - @InterfaceAudience.LimitedPrivate({"Hive"}) - @InterfaceStability.Evolving - public AddPartitionMessage buildAddPartitionMessage(Table table, PartitionSpecProxy partitionSpec) { - return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), - table.getTableName(), getPartitionKeyValues(table, partitionSpec), now()); + table.getTableName(), getPartitionKeyValues(table, partitionsIterator), now()); } @Override @@ -142,22 +139,12 @@ private long now() { return partitionKeys; } - private static List> getPartitionKeyValues(Table table, List partitions) { - List> partitionList = new ArrayList>(partitions.size()); - for (Partition partition : partitions) - partitionList.add(getPartitionKeyValues(table, partition)); - return partitionList; - } - - @InterfaceAudience.LimitedPrivate({"Hive"}) - @InterfaceStability.Evolving - private static List> getPartitionKeyValues(Table table, PartitionSpecProxy partitionSpec) { - List> partitionList = new ArrayList>(); - PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); - while (iterator.hasNext()) { - Partition partition = iterator.next(); - partitionList.add(getPartitionKeyValues(table, partition)); - } - return partitionList; + private static List> getPartitionKeyValues(final Table table, Iterator iterator) { + return Lists.newArrayList(Iterators.transform(iterator, new Function>() { + @Override + public Map apply(@Nullable Partition partition) { + return getPartitionKeyValues(table, partition); + } + })); } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java index 4dbd26a..ead5a19 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import com.google.common.collect.Lists; import junit.framework.TestCase; import org.apache.hadoop.hive.cli.CliSessionState; @@ -296,7 +297,8 @@ public void testListener() throws Exception { AddPartitionEvent partEvent = (AddPartitionEvent)(notifyList.get(listSize-1)); assert partEvent.getStatus(); Partition part = msc.getPartition("hive2038", "tmptbl", "b=2011"); - validateAddPartition(part, partEvent.getPartitions().get(0)); + Partition partAdded = partEvent.getPartitionIterator().next(); + validateAddPartition(part, partAdded); validateTableInAddPartition(tbl, partEvent.getTable()); validateAddPartition(part, prePartEvent.getPartitions().get(0)); @@ -313,11 +315,12 @@ public void testListener() throws Exception { hmsClient.add_partitions(Arrays.asList(partition1, partition2, partition3)); ++listSize; AddPartitionEvent multiplePartitionEvent = (AddPartitionEvent)(notifyList.get(listSize-1)); - assertEquals("Unexpected number of partitions in event!", 3, multiplePartitionEvent.getPartitions().size()); assertEquals("Unexpected table value.", table, multiplePartitionEvent.getTable()); - assertEquals("Unexpected partition value.", partition1.getValues(), multiplePartitionEvent.getPartitions().get(0).getValues()); - assertEquals("Unexpected partition value.", partition2.getValues(), multiplePartitionEvent.getPartitions().get(1).getValues()); - assertEquals("Unexpected partition value.", partition3.getValues(), multiplePartitionEvent.getPartitions().get(2).getValues()); + List multiParts = Lists.newArrayList(multiplePartitionEvent.getPartitionIterator()); + assertEquals("Unexpected number of partitions in event!", 3, multiParts.size()); + assertEquals("Unexpected partition value.", partition1.getValues(), multiParts.get(0).getValues()); + assertEquals("Unexpected partition value.", partition2.getValues(), multiParts.get(1).getValues()); + assertEquals("Unexpected partition value.", partition3.getValues(), multiParts.get(2).getValues()); driver.run(String.format("alter table %s touch partition (%s)", tblName, "b='2011'")); listSize++; @@ -352,7 +355,8 @@ public void testListener() throws Exception { AddPartitionEvent appendPartEvent = (AddPartitionEvent)(notifyList.get(listSize-1)); - validateAddPartition(newPart, appendPartEvent.getPartitions().get(0)); + Partition partAppended = appendPartEvent.getPartitionIterator().next(); + validateAddPartition(newPart, partAppended); PreAddPartitionEvent preAppendPartEvent = (PreAddPartitionEvent)(preNotifyList.get(preNotifyList.size() - 1)); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java index 5b9c350..049cf71 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java @@ -61,18 +61,18 @@ public Table getTable() { return table; } - /** - * @return List of partitions. - */ - public List getPartitions() { - return partitions; - } + + // Note : List getPartitions() removed with HIVE-9609 because it will result in OOM errors with large add_partitions. /** * @return Iterator for partitions. */ public Iterator getPartitionIterator() { - return partitionSpecProxy == null ? null : partitionSpecProxy.getPartitionIterator(); + if (partitions != null){ + return partitions.iterator(); + } else { + return partitionSpecProxy == null ? null : partitionSpecProxy.getPartitionIterator(); + } } }