diff --git a/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java b/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java index 7551a27..b47d9ff 100644 --- a/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java +++ b/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java @@ -24,9 +24,9 @@ import java.util.HashMap; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.ExceptionListener; -import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; @@ -76,17 +76,42 @@ import org.slf4j.LoggerFactory; public class NotificationListener extends MetaStoreEventListener { private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class); - protected Session session; protected Connection conn; private static MessageFactory messageFactory = MessageFactory.getInstance(); + public static final int NUM_RETRIES = 1; + private static final String HEALTH_CHECK_TOPIC_SUFFIX = "jms_health_check"; + private static final String HEALTH_CHECK_MSG = "HCAT_JMS_HEALTH_CHECK_MESSAGE"; + + protected final ThreadLocal session = new ThreadLocal() { + @Override + protected Session initialValue() { + try { + return createSession(); + } catch (Exception e) { + LOG.error("Couldn't create JMS Session", e); + return null; + } + } + + @Override + public void remove() { + if( get() != null) { + try { + get().close(); + } catch (Exception e) { + LOG.error("Unable to close bad JMS session, ignored error", e); + } + } + super.remove(); + } + }; /** * Create message bus connection and session in constructor. */ public NotificationListener(final Configuration conf) { - super(conf); - createConnection(); + testAndCreateConnection(); } private static String getTopicName(Partition partition, @@ -216,7 +241,7 @@ public class NotificationListener extends MetaStoreEventListener { } } - private String getTopicPrefix(HiveConf conf) { + private String getTopicPrefix(Configuration conf) { return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); } @@ -253,109 +278,159 @@ public class NotificationListener extends MetaStoreEventListener { * @param topicName is the name on message broker on which message is sent. */ protected void send(HCatEventMessage hCatEventMessage, String topicName) { - try { - if(null == session){ - // this will happen, if we never able to establish a connection. - createConnection(); - if (null == session){ - // Still not successful, return from here. - LOG.error("Invalid session. Failed to send message on topic: " + - topicName + " event: " + hCatEventMessage.getEventType()); - return; - } - } - - Destination topic = getTopic(topicName); + send(hCatEventMessage, topicName, NUM_RETRIES); + } - if (null == topic){ - // Still not successful, return from here. - LOG.error("Invalid session. Failed to send message on topic: " + - topicName + " event: " + hCatEventMessage.getEventType()); - return; + /** + * @param hCatEventMessage The HCatEventMessage being sent over JMS, this method is threadsafe + * @param topicName is the name on message broker on which message is sent. + * @param retries the number of retry attempts + */ + protected void send(HCatEventMessage hCatEventMessage, String topicName, int retries) { + try { + if(session.get() == null) { + // Need to reconnect + throw new JMSException("Invalid JMS session"); } - - MessageProducer producer = session.createProducer(topic); - Message msg = session.createTextMessage(hCatEventMessage.toString()); + Destination topic = createTopic(topicName); + Message msg = session.get().createTextMessage(hCatEventMessage.toString()); msg.setStringProperty(HCatConstants.HCAT_EVENT, hCatEventMessage.getEventType().toString()); msg.setStringProperty(HCatConstants.HCAT_MESSAGE_VERSION, messageFactory.getVersion()); msg.setStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT, messageFactory.getMessageFormat()); + MessageProducer producer = createProducer(topic); producer.send(msg); // Message must be transacted before we return. - session.commit(); - } - catch(Exception e){ - // Gobble up the exception. Message delivery is best effort. - LOG.error("Failed to send message on topic: " + topicName + - " event: " + hCatEventMessage.getEventType(), e); + session.get().commit(); + } catch(Exception e){ + if(retries >= 0) { + // this may happen if we were able to establish connection once, but its no longer valid + LOG.error("Seems like connection is lost. Will retry. Retries left : " + retries + ". error was:", e); + testAndCreateConnection(); + send(hCatEventMessage, topicName, retries-1); + } else { + // Gobble up the exception. Message delivery is best effort. + LOG.error("Failed to send message on topic: " + topicName + + " event: " + hCatEventMessage.getEventType() + " after retries: " + NUM_RETRIES, e); + } } } /** - * Get the topic object for the topicName, it also tries to reconnect - * if the connection appears to be broken. + * Get the topic object for the topicName * * @param topicName The String identifying the message-topic. * @return A {@link Topic} object corresponding to the specified topicName. * @throws JMSException */ - protected Topic getTopic(final String topicName) throws JMSException { - Topic topic; + protected Topic createTopic(final String topicName) throws JMSException { + return session.get().createTopic(topicName); + } + + /** + * Does a health check on the connection by sending a dummy message. + * Create the connection if the connection is found to be bad + * Also recreates the session + */ + protected synchronized void testAndCreateConnection() { + if(conn != null) { + // This method is reached when error occurs while sending msg, so the session must be bad + session.remove(); + if (!isConnectionHealthy()) { + // I am the first thread to detect the error, cleanup old connection & reconnect + try { + conn.close(); + } catch (Exception e) { + LOG.error("Unable to close bad JMS connection, ignored error", e); + } + conn = createConnection(); + } + } else { + conn = createConnection(); + } try { - // Topics are created on demand. If it doesn't exist on broker it will - // be created when broker receives this message. - topic = session.createTopic(topicName); - } catch (IllegalStateException ise) { - // this will happen if we were able to establish connection once, but its no longer valid, - // ise is thrown, catch it and retry. - LOG.error("Seems like connection is lost. Retrying", ise); - createConnection(); - topic = session.createTopic(topicName); + session.set(createSession()); + } catch (JMSException e) { + LOG.error("Couldn't create JMS session, ignored the error", e); } - return topic; } - protected void createConnection() { + /** + * Create the JMS connection + * @return newly created JMS connection + */ + protected Connection createConnection() { + LOG.info("Will create new JMS connection"); + Context jndiCntxt; + Connection jmsConnection = null; + try { + jndiCntxt = new InitialContext(); + ConnectionFactory connFac = (ConnectionFactory) jndiCntxt.lookup("ConnectionFactory"); + jmsConnection = connFac.createConnection(); + jmsConnection.start(); + jmsConnection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException jmse) { + LOG.error("JMS Exception listener received exception. Ignored the error", jmse); + } + }); + } catch (NamingException e) { + LOG.error("JNDI error while setting up Message Bus connection. " + + "Please make sure file named 'jndi.properties' is in " + + "classpath and contains appropriate key-value pairs.", e); + } catch (JMSException e) { + LOG.error("Failed to initialize connection to message bus", e); + } catch (Throwable t) { + LOG.error("Unable to connect to JMS provider", t); + } + return jmsConnection; + } - Context jndiCntxt; + /** + * Send a dummy message to probe if the JMS connection is healthy + * @return true if connection is healthy, false otherwise + */ + protected boolean isConnectionHealthy() { try { - jndiCntxt = new InitialContext(); - ConnectionFactory connFac = (ConnectionFactory) jndiCntxt - .lookup("ConnectionFactory"); - Connection conn = connFac.createConnection(); - conn.start(); - conn.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException jmse) { - LOG.error(jmse.toString()); - } - }); - // We want message to be sent when session commits, thus we run in - // transacted mode. - session = conn.createSession(true, Session.SESSION_TRANSACTED); - } catch (NamingException e) { - LOG.error("JNDI error while setting up Message Bus connection. " - + "Please make sure file named 'jndi.properties' is in " - + "classpath and contains appropriate key-value pairs.", e); - } catch (JMSException e) { - LOG.error("Failed to initialize connection to message bus", e); - } catch (Throwable t) { - LOG.error("Unable to connect to JMS provider", t); + Topic topic = createTopic(getTopicPrefix(getConf()) + "." + HEALTH_CHECK_TOPIC_SUFFIX); + MessageProducer producer = createProducer(topic); + Message msg = session.get().createTextMessage(HEALTH_CHECK_MSG); + producer.send(msg, DeliveryMode.NON_PERSISTENT, 4, 0); + } catch (Exception e) { + return false; } + return true; + } + + /** + * Creates a JMS session + * @return newly create JMS session + * @throws JMSException + */ + protected Session createSession() throws JMSException { + // We want message to be sent when session commits, thus we run in + // transacted mode. + return conn.createSession(true, Session.SESSION_TRANSACTED); + } + + /** + * Create a JMS producer + * @param topic + * @return newly created message producer + * @throws JMSException + */ + protected MessageProducer createProducer(Destination topic) throws JMSException { + return session.get().createProducer(topic); } @Override protected void finalize() throws Throwable { - // Close the connection before dying. - try { - if (null != session) - session.close(); - if (conn != null) { + if (conn != null) { + try { conn.close(); + } catch (Exception e) { + LOG.error("Couldn't close jms connection, ignored the error", e); } - - } catch (Exception ignore) { - LOG.info("Failed to close message bus connection.", ignore); } }