Index: src/java/org/apache/hcatalog/listener/NotificationListener.java =================================================================== --- src/java/org/apache/hcatalog/listener/NotificationListener.java (revision 1406021) +++ src/java/org/apache/hcatalog/listener/NotificationListener.java (working copy) @@ -24,16 +24,8 @@ import java.util.Map; import java.util.Map.Entry; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.ExceptionListener; +import javax.jms.*; import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; @@ -195,7 +187,7 @@ me.initCause(e); throw me; } - send(newTbl,getTopicPrefix(conf)+ "."+ newTbl.getDbName().toLowerCase(), HCatConstants.HCAT_ADD_TABLE_EVENT); + send(newTbl, getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase(), HCatConstants.HCAT_ADD_TABLE_EVENT); } } @@ -236,7 +228,6 @@ try{ - Destination topic = null; if(null == session){ // this will happen, if we never able to establish a connection. createConnection(); @@ -247,24 +238,16 @@ return; } } - try{ - // Topics are created on demand. If it doesn't exist on broker it will - // be created when broker receives this message. - topic = session.createTopic(topicName); - } catch (IllegalStateException ise){ - // this will happen if we were able to establish connection once, but its no longer valid, - // ise is thrown, catch it and retry. - LOG.error("Seems like connection is lost. Retrying", ise); - createConnection(); - topic = session.createTopic(topicName); - } - if (null == topic){ - // Still not successful, return from here. - LOG.error("Invalid session. Failed to send message on topic: "+ - topicName + " event: "+event); - return; - } - MessageProducer producer = session.createProducer(topic); + Destination topic = null; + topic = getTopic(topicName); + if (null == topic){ + // Still not successful, return from here. + LOG.error("Invalid session. Failed to send message on topic: "+ + topicName + " event: "+ event); + return; + } + + MessageProducer producer = session.createProducer(topic); Message msg; if (msgBody instanceof Map){ MapMessage mapMsg = session.createMapMessage(); @@ -289,6 +272,22 @@ } } + protected Topic getTopic(final String topicName) throws JMSException { + Topic topic; + try{ + // Topics are created on demand. If it doesn't exist on broker it will + // be created when broker receives this message. + topic = session.createTopic(topicName); + } catch (IllegalStateException ise){ + // this will happen if we were able to establish connection once, but its no longer valid, + // ise is thrown, catch it and retry. + LOG.error("Seems like connection is lost. Retrying", ise); + createConnection(); + topic = session.createTopic(topicName); + } + return topic; + } + protected void createConnection(){ Context jndiCntxt;