Index: src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java =================================================================== --- src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java (revision 1136915) +++ src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java (working copy) @@ -66,6 +66,7 @@ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat"); SessionState.start(new CliSessionState(hiveConf)); driver = new Driver(hiveConf); } @@ -75,7 +76,7 @@ Connection conn = connFac.createConnection(); conn.start(); Session session = conn.createSession(true, Session.SESSION_TRANSACTED); - Destination hcatTopic = session.createTopic(HCatConstants.HCAT_TOPIC); + Destination hcatTopic = session.createTopic("planetlab.hcat"); consumer = session.createConsumer(hcatTopic); } @@ -85,7 +86,7 @@ driver.run("create database testconndb"); Message msg = consumer.receive(); assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); + assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString()); assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName()); broker.stop(); driver.run("drop database testconndb cascade"); @@ -94,12 +95,12 @@ driver.run("create database testconndb"); msg = consumer.receive(); assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); + assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString()); assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName()); driver.run("drop database testconndb cascade"); msg = consumer.receive(); assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); + assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString()); assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName()); } catch (NoSuchObjectException nsoe){ nsoe.printStackTrace(System.err); Index: src/test/org/apache/hcatalog/listener/TestNotificationListener.java =================================================================== --- src/test/org/apache/hcatalog/listener/TestNotificationListener.java (revision 1136915) +++ src/test/org/apache/hcatalog/listener/TestNotificationListener.java (working copy) @@ -68,13 +68,13 @@ // We want message to be sent when session commits, thus we run in // transacted mode. Session session = conn.createSession(true, Session.SESSION_TRANSACTED); - Destination hcatTopic = session.createTopic(HCatConstants.HCAT_TOPIC); + Destination hcatTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); MessageConsumer consumer1 = session.createConsumer(hcatTopic); consumer1.setMessageListener(this); - Destination tblTopic = session.createTopic(HCatConstants.HCAT_TOPIC+".mydb.mytbl"); + Destination tblTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb.mytbl"); MessageConsumer consumer2 = session.createConsumer(tblTopic); consumer2.setMessageListener(this); - Destination dbTopic = session.createTopic(HCatConstants.HCAT_TOPIC+".mydb"); + Destination dbTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb"); MessageConsumer consumer3 = session.createConsumer(dbTopic); consumer3.setMessageListener(this); hiveConf = new HiveConf(this.getClass()); @@ -112,7 +112,7 @@ event = msg.getStringProperty(HCatConstants.HCAT_EVENT); if(event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)){ - assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); + assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString()); assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName()); } else if(event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)){ @@ -153,7 +153,7 @@ } else if(event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)){ - assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); + assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString()); assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName()); } else Index: src/java/org/apache/hcatalog/listener/NotificationListener.java =================================================================== --- src/java/org/apache/hcatalog/listener/NotificationListener.java (revision 1136915) +++ src/java/org/apache/hcatalog/listener/NotificationListener.java (working copy) @@ -38,6 +38,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.MetaStoreEventListener; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; @@ -133,7 +134,7 @@ // by listening on a topic named "HCAT" and message selector string // as "HCAT_EVENT = HCAT_ADD_DATABASE" if(dbEvent.getStatus()) - send(dbEvent.getDatabase(),HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_ADD_DATABASE_EVENT); + send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_ADD_DATABASE_EVENT); } @Override @@ -142,7 +143,7 @@ // by listening on a topic named "HCAT" and message selector string // as "HCAT_EVENT = HCAT_DROP_DATABASE" if(dbEvent.getStatus()) - send(dbEvent.getDatabase(),HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_DROP_DATABASE_EVENT); + send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_DROP_DATABASE_EVENT); } @Override @@ -155,23 +156,23 @@ Table tbl = tableEvent.getTable(); Table newTbl = tbl.deepCopy(); HMSHandler handler = tableEvent.getHandler(); - String namingPolicy = handler.getHiveConf().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAMING_POLICY, "tablename"); - newTbl.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME, getTopicNameForParts(namingPolicy, tbl.getDbName(), tbl.getTableName())); + HiveConf conf = handler.getHiveConf(); + newTbl.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME, + getTopicPrefix(conf) + "." + tbl.getDbName() +"." + tbl.getTableName()); try { handler.alter_table(tbl.getDbName(), tbl.getTableName(), newTbl); } catch (InvalidOperationException e) { throw new MetaException(e.toString()); } - send(tableEvent.getTable(),HCatConstants.HCAT_TOPIC+"."+tbl.getDbName(), HCatConstants.HCAT_ADD_TABLE_EVENT); + send(tableEvent.getTable(),getTopicPrefix(conf)+ "."+ tbl.getDbName(), HCatConstants.HCAT_ADD_TABLE_EVENT); } } } - private String getTopicNameForParts(String namingPolicy, String dbName, String tblName){ - // we only have one policy now, so ignore policy param for now. - return HCatConstants.HCAT_TOPIC+"."+dbName+"."+tblName; + private String getTopicPrefix(HiveConf conf){ + return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); } - + @Override public void onDropTable(DropTableEvent tableEvent) throws MetaException { // Subscriber can get notification about drop of a table in HCAT @@ -188,7 +189,7 @@ sd.setSortCols(new ArrayList()); sd.setParameters(new HashMap()); sd.getSerdeInfo().setParameters(new HashMap()); - send(table,HCatConstants.HCAT_TOPIC+"."+table.getDbName(), HCatConstants.HCAT_DROP_TABLE_EVENT); + send(table,getTopicPrefix(tableEvent.getHandler().getHiveConf())+"."+table.getDbName(), HCatConstants.HCAT_DROP_TABLE_EVENT); } } @@ -246,7 +247,7 @@ } } - private void createConnection(){ + protected void createConnection(){ Context jndiCntxt; try { Index: src/java/org/apache/hcatalog/common/HCatConstants.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatConstants.java (revision 1136915) +++ src/java/org/apache/hcatalog/common/HCatConstants.java (working copy) @@ -70,9 +70,10 @@ public static final String HCAT_MSGBUS_TOPIC_NAME = "hcat.msgbus.topic.name"; public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = "hcat.msgbus.topic.naming.policy"; + public static final String HCAT_MSGBUS_TOPIC_PREFIX = "hcat.msgbus.topic.prefix"; // Message Bus related properties. - public static final String HCAT_TOPIC = "HCAT"; + public static final String HCAT_DEFAULT_TOPIC_PREFIX = "HCAT"; public static final String HCAT_EVENT = "HCAT_EVENT"; public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION"; public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION";