Index: server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (revision 1407699) +++ server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (working copy) @@ -34,6 +34,7 @@ import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.Topic; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; @@ -266,7 +267,6 @@ * select messages in client side. */ protected void send(Object msgBody, String topicName, String event) { - try { Destination topic = null; @@ -280,18 +280,7 @@ return; } } - try { - // Topics are created on demand. If it doesn't exist on broker it will - // be created when broker receives this message. - topic = session.createTopic(topicName); - } catch (IllegalStateException ise) { - // this will happen if we were able to establish connection once, but - // its no longer valid, - // ise is thrown, catch it and retry. - LOG.error("Seems like connection is lost. Retrying", ise); - createConnection(); - topic = session.createTopic(topicName); - } + topic = getTopic(topicName); if (null == topic) { // Still not successful, return from here. LOG.error("Invalid session. Failed to send message on topic: " @@ -318,10 +307,34 @@ } catch (Exception e) { // Gobble up the exception. Message delivery is best effort. LOG.error("Failed to send message on topic: " + topicName + " event: " - + event, e); + + event, e); } } + /** + * Get the topic object for the topicName, it also tries to reconnect + * if the connection appears to be broken. + * + * @param topicName + * @return + * @throws JMSException + */ + protected Topic getTopic(final String topicName) throws JMSException { + Topic topic; + try { + // Topics are created on demand. If it doesn't exist on broker it will + // be created when broker receives this message. + topic = session.createTopic(topicName); + } catch (IllegalStateException ise) { + // this will happen if we were able to establish connection once, but its no longer valid, + // ise is thrown, catch it and retry. + LOG.error("Seems like connection is lost. Retrying", ise); + createConnection(); + topic = session.createTopic(topicName); + } + return topic; + } + protected void createConnection() { Context jndiCntxt;