Index: src/java/org/apache/hcatalog/listener/NotificationListener.java =================================================================== --- src/java/org/apache/hcatalog/listener/NotificationListener.java (revision 1195729) +++ src/java/org/apache/hcatalog/listener/NotificationListener.java (working copy) @@ -52,14 +52,15 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; import org.apache.hadoop.hive.metastore.events.CreateTableEvent; import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; -import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; import org.apache.hcatalog.common.HCatConstants; /** @@ -87,6 +88,24 @@ createConnection(); } + private static String getTopicName(Partition partition, + ListenerEvent partitionEvent) throws MetaException { + try { + String topicName = partitionEvent.getHandler() + .get_table(partition.getDbName(), partition.getTableName()) + .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME); + if (topicName == null) { + throw new MetaException( + "Topic name not found in metastore. Please do alter table set properties (" + + HCatConstants.HCAT_MSGBUS_TOPIC_NAME + + "=dbname.tablename) or whatever you want topic name to be."); + } + return topicName; + } catch (NoSuchObjectException e) { + throw new MetaException(e.toString()); + } + } + @Override public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException { // Subscriber can get notification of newly add partition in a @@ -95,13 +114,7 @@ if(partitionEvent.getStatus()){ Partition partition = partitionEvent.getPartition(); - String topicName; - try { - topicName = partitionEvent.getHandler().get_table( - partition.getDbName(), partition.getTableName()).getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME); - } catch (NoSuchObjectException e) { - throw new MetaException(e.toString()); - } + String topicName = getTopicName(partition, partitionEvent); send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT); } @@ -123,13 +136,7 @@ sd.setSortCols(new ArrayList()); sd.setParameters(new HashMap()); sd.getSerdeInfo().setParameters(new HashMap()); - String topicName; - try { - topicName = partitionEvent.getHandler().get_table( - partition.getDbName(), partition.getTableName()).getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME); - } catch (NoSuchObjectException e) { - throw new MetaException(e.toString()); - } + String topicName = getTopicName(partition, partitionEvent); send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT); } }