diff --git hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java index 664248d..c35c9bd 100644 --- hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java +++ hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java @@ -19,11 +19,7 @@ package org.apache.hive.hcatalog.listener; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; - import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; @@ -45,19 +41,14 @@ import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; -import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; -import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; import org.apache.hadoop.hive.metastore.events.CreateTableEvent; import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; -import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.messaging.HCatEventMessage; @@ -65,26 +56,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hive.hcatalog.common.HCatConstants.HCAT_MSGBUS_TOPIC_NAME; + /** - * Implementation of - * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends - * message on two type of topics. One has name of form dbName.tblName On this - * topic, two kind of messages are sent: add/drop partition and - * finalize_partition message. Second topic has name "HCAT" and messages sent on - * it are: add/drop database and add/drop table. All messages also has a - * property named "HCAT_EVENT" set on them whose value can be used to configure - * message selector on subscriber side. + * Implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} that publishes + * messages to JMS topics. The JMS connection is being created using JNDI using the name + * {@code ConnectionFactory}. */ public class NotificationListener extends MetaStoreEventListener { - private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class); - protected Connection conn; - private static MessageFactory messageFactory = MessageFactory.getInstance(); public static final int NUM_RETRIES = 1; + private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class); + private static final MessageFactory MESSAGE_FACTORY = MessageFactory.getInstance(); private static final String HEALTH_CHECK_TOPIC_SUFFIX = "jms_health_check"; private static final String HEALTH_CHECK_MSG = "HCAT_JMS_HEALTH_CHECK_MESSAGE"; - - protected final ThreadLocal session = new ThreadLocal() { + private final ThreadLocal session = new ThreadLocal() { @Override protected Session initialValue() { try { @@ -107,180 +93,210 @@ public void remove() { super.remove(); } }; + private Connection conn; + + private static String getTopicName(Table table) { + return table.getParameters().get(HCAT_MSGBUS_TOPIC_NAME); + } /** * Create message bus connection and session in constructor. */ - public NotificationListener(final Configuration conf) { + public NotificationListener(Configuration conf) { super(conf); testAndCreateConnection(); } - private static String getTopicName(Table table, ListenerEvent partitionEvent) { - return table.getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME); + /** + * Publishes a new message to a topic for every table that is created. Additionally it adds a + * property to the table + * ({@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_MSGBUS_TOPIC_NAME}) + * that names the topic name for later modifications regarding this table. + *

+ * The topic is named {@code .}. + * The prefix can be configured using the property + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_MSGBUS_TOPIC_PREFIX} + * which defaults to + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_DEFAULT_TOPIC_PREFIX}. + *

+ * This message selector can be used to filter these messages: + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_EVENT} = + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_CREATE_TABLE_EVENT} + */ + @Override + public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { + if (tableEvent.getStatus()) { + Table tbl = tableEvent.getTable(); + HMSHandler handler = tableEvent.getHandler(); + HiveConf conf = handler.getHiveConf(); + Table newTbl; + try { + newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName()).deepCopy(); + + // Add a property to the table that sets the topic name for other notifications regarding + // this table (e.g. add partition). Only do this if the property hasn't already been set. + if (!newTbl.getParameters().containsKey(HCAT_MSGBUS_TOPIC_NAME)) { + newTbl.getParameters() + .put(HCAT_MSGBUS_TOPIC_NAME, getTopicPrefix(conf) + + "." + + newTbl.getDbName().toLowerCase() + + "." + + newTbl.getTableName().toLowerCase()); + handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl); + } + } catch (InvalidOperationException e) { + MetaException me = new MetaException(e.toString()); + me.initCause(e); + throw me; + } catch (NoSuchObjectException e) { + MetaException me = new MetaException(e.toString()); + me.initCause(e); + throw me; + } + String topicName = getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase(); + send(MESSAGE_FACTORY.buildCreateTableMessage(newTbl), topicName); + } + } + + /** + * Publishes a new message to a topic for every table that is dropped. + *

+ * The topic is named {@code .}. + * The prefix can be configured using the property + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_MSGBUS_TOPIC_PREFIX} + * which defaults to + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_DEFAULT_TOPIC_PREFIX}. + *

+ * This message selector can be used to filter these messages: + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_EVENT} = + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_DROP_TABLE_EVENT} + */ + @Override + public void onDropTable(DropTableEvent tableEvent) throws MetaException { + if (tableEvent.getStatus()) { + Table table = tableEvent.getTable(); + String topicName = + getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + table.getDbName() + .toLowerCase(); + send(MESSAGE_FACTORY.buildDropTableMessage(table), topicName); + } } + /** + * Sends a notification for every added partition. To enable this for a particular table the table + * must have a property + * {@link org.apache.hive.hcatalog.common.HCatConstants#HCAT_MSGBUS_TOPIC_NAME} that names the + * topic to publish to. A message selector of + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_EVENT} = + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_ADD_PARTITION_EVENT} can be used + * to filter these notifications. + */ @Override - public void onAddPartition(AddPartitionEvent partitionEvent) - throws MetaException { - // Subscriber can get notification of newly add partition in a - // particular table by listening on a topic named "dbName.tableName" - // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" + public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException { if (partitionEvent.getStatus()) { Table table = partitionEvent.getTable(); List partitions = partitionEvent.getPartitions(); - String topicName = getTopicName(table, partitionEvent); - if (topicName != null && !topicName.equals("")) { - send(messageFactory.buildAddPartitionMessage(table, partitions), topicName); + String topicName = getTopicName(table); + if (topicName == null || topicName.isEmpty()) { + LOG.info( + "Topic name not found in Metastore. Suppressing HCatalog notification for {}.{} To enable" + + " notifications for this table, please do ALTER TABLE SET PROPERTIES (" + + HCAT_MSGBUS_TOPIC_NAME + "=", + partitions.get(0).getDbName(), partitions.get(0).getTableName()); } else { - LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " - + partitions.get(0).getDbName() - + "." - + partitions.get(0).getTableName() - + " To enable notifications for this table, please do alter table set properties (" - + HCatConstants.HCAT_MSGBUS_TOPIC_NAME - + "=.) or whatever you want topic name to be."); + send(MESSAGE_FACTORY.buildAddPartitionMessage(table, partitions), topicName); } } } /** - * Send dropped partition notifications. Subscribers can receive these notifications for a - * particular table by listening on a topic named "dbName.tableName" with message selector - * string {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_EVENT} = - * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_DROP_PARTITION_EVENT}. - *
- * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been - * found to throw NPE when serializing objects that contain null. For this reason we override - * some fields in the StorageDescriptor of this notification. This should be fixed after - * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved. + * Sends a notification for every added partition. To enable this for a particular table the table + * must have a property + * {@link org.apache.hive.hcatalog.common.HCatConstants#HCAT_MSGBUS_TOPIC_NAME} that names the + * topic to publish to. A message selector of + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_EVENT} = + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_DROP_PARTITION_EVENT} can be used + * to filter these notifications. */ @Override public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { if (partitionEvent.getStatus()) { + Table table = partitionEvent.getTable(); Partition partition = partitionEvent.getPartition(); - StorageDescriptor sd = partition.getSd(); - sd.setBucketCols(new ArrayList()); - sd.setSortCols(new ArrayList()); - sd.setParameters(new HashMap()); - sd.getSerdeInfo().setParameters(new HashMap()); - sd.getSkewedInfo().setSkewedColNames(new ArrayList()); - String topicName = getTopicName(partitionEvent.getTable(), partitionEvent); - if (topicName != null && !topicName.equals("")) { - send(messageFactory.buildDropPartitionMessage(partitionEvent.getTable(), partition), topicName); + String topicName = getTopicName(table); + if (topicName == null || topicName.isEmpty()) { + LOG.info( + "Topic name not found in Metastore. Suppressing HCatalog notification for {}.{} To enable" + + " notifications for this table, please do ALTER TABLE SET PROPERTIES (" + + HCAT_MSGBUS_TOPIC_NAME + "=", + partition.getDbName(), partition.getTableName()); } else { - LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " - + partition.getDbName() - + "." - + partition.getTableName() - + " To enable notifications for this table, please do alter table set properties (" - + HCatConstants.HCAT_MSGBUS_TOPIC_NAME - + "=.) or whatever you want topic name to be."); + send(MESSAGE_FACTORY.buildDropPartitionMessage(table, partition), topicName); } } } + /** + * Publishes a new message to a topic for every database that is created. + *

+ * The topic can be specified using the property + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_MSGBUS_TOPIC_PREFIX} + * which defaults to + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_DEFAULT_TOPIC_PREFIX}. + *

+ * This message selector can be used to filter these messages: + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_EVENT} = + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_CREATE_DATABASE_EVENT} + */ @Override - public void onCreateDatabase(CreateDatabaseEvent dbEvent) - throws MetaException { - // Subscriber can get notification about addition of a database in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_ADD_DATABASE" + public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException { if (dbEvent.getStatus()) { String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf()); - send(messageFactory.buildCreateDatabaseMessage(dbEvent.getDatabase()), topicName); + send(MESSAGE_FACTORY.buildCreateDatabaseMessage(dbEvent.getDatabase()), topicName); } } + /** + * Publishes a new message to a topic for every database that is dropped. + *

+ * The topic can be specified using the property + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_MSGBUS_TOPIC_PREFIX} + * which defaults to + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_DEFAULT_TOPIC_PREFIX}. + *

+ * This message selector can be used to filter these messages: + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_EVENT} = + * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_DROP_DATABASE_EVENT} + */ @Override public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { - // Subscriber can get notification about drop of a database in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_DROP_DATABASE" if (dbEvent.getStatus()) { String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf()); - send(messageFactory.buildDropDatabaseMessage(dbEvent.getDatabase()), topicName); + send(MESSAGE_FACTORY.buildDropDatabaseMessage(dbEvent.getDatabase()), topicName); } } @Override - public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { - // Subscriber can get notification about addition of a table in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_ADD_TABLE" - if (tableEvent.getStatus()) { - Table tbl = tableEvent.getTable(); - HMSHandler handler = tableEvent.getHandler(); - HiveConf conf = handler.getHiveConf(); - Table newTbl; - try { - newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName()) - .deepCopy(); - newTbl.getParameters().put( - HCatConstants.HCAT_MSGBUS_TOPIC_NAME, - getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "." - + newTbl.getTableName().toLowerCase()); - handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl); - } catch (InvalidOperationException e) { - MetaException me = new MetaException(e.toString()); - me.initCause(e); - throw me; - } catch (NoSuchObjectException e) { - MetaException me = new MetaException(e.toString()); - me.initCause(e); - throw me; - } - String topicName = getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase(); - send(messageFactory.buildCreateTableMessage(newTbl), topicName); - } - } - - private String getTopicPrefix(Configuration conf) { - return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, - HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); - } - - /** - * Send dropped table notifications. Subscribers can receive these notifications for - * dropped tables by listening on topic "HCAT" with message selector string - * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_EVENT} = - * {@value org.apache.hive.hcatalog.common.HCatConstants#HCAT_DROP_TABLE_EVENT} - *
- * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been - * found to throw NPE when serializing objects that contain null. For this reason we override - * some fields in the StorageDescriptor of this notification. This should be fixed after - * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved. - */ - @Override - public void onDropTable(DropTableEvent tableEvent) throws MetaException { - // Subscriber can get notification about drop of a table in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_DROP_TABLE" - - // Datanucleus throws NPE when we try to serialize a table object - // retrieved from metastore. To workaround that we reset following objects - - if (tableEvent.getStatus()) { - Table table = tableEvent.getTable(); - String topicName = getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + table.getDbName().toLowerCase(); - send(messageFactory.buildDropTableMessage(table), topicName); - } + public void onLoadPartitionDone(LoadPartitionDoneEvent lpde) throws MetaException { +// if(lpde.getStatus()) +// send(lpde.getPartitionName(), +// lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME), HCatConstants.HCAT_PARTITION_DONE_EVENT); } /** - * @param hCatEventMessage The HCatEventMessage being sent over JMS. - * @param topicName is the name on message broker on which message is sent. + * @param hCatEventMessage The HCatEventMessage being sent over JMS + * @param topicName is the name on message broker on which message is sent */ - protected void send(HCatEventMessage hCatEventMessage, String topicName) { + private void send(HCatEventMessage hCatEventMessage, String topicName) { send(hCatEventMessage, topicName, NUM_RETRIES); } /** * @param hCatEventMessage The HCatEventMessage being sent over JMS, this method is threadsafe - * @param topicName is the name on message broker on which message is sent. - * @param retries the number of retry attempts + * @param topicName is the name on message broker on which message is sent + * @param retries the number of retry attempts */ - protected void send(HCatEventMessage hCatEventMessage, String topicName, int retries) { + private void send(HCatEventMessage hCatEventMessage, String topicName, int retries) { try { if (session.get() == null) { // Need to reconnect @@ -290,8 +306,8 @@ protected void send(HCatEventMessage hCatEventMessage, String topicName, int ret Message msg = session.get().createTextMessage(hCatEventMessage.toString()); msg.setStringProperty(HCatConstants.HCAT_EVENT, hCatEventMessage.getEventType().toString()); - msg.setStringProperty(HCatConstants.HCAT_MESSAGE_VERSION, messageFactory.getVersion()); - msg.setStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT, messageFactory.getMessageFormat()); + msg.setStringProperty(HCatConstants.HCAT_MESSAGE_VERSION, MESSAGE_FACTORY.getVersion()); + msg.setStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT, MESSAGE_FACTORY.getMessageFormat()); MessageProducer producer = createProducer(topic); producer.send(msg); // Message must be transacted before we return. @@ -299,66 +315,39 @@ protected void send(HCatEventMessage hCatEventMessage, String topicName, int ret } catch (Exception e) { if (retries >= 0) { // this may happen if we were able to establish connection once, but its no longer valid - LOG.error("Seems like connection is lost. Will retry. Retries left : " + retries + ". error was:", e); + LOG.error("Seems like connection is lost. Will retry. Retries left : {}. error was:", + retries, e); testAndCreateConnection(); send(hCatEventMessage, topicName, retries - 1); } else { // Gobble up the exception. Message delivery is best effort. - LOG.error("Failed to send message on topic: " + topicName + - " event: " + hCatEventMessage.getEventType() + " after retries: " + NUM_RETRIES, e); + LOG.error("Failed to send message on topic: {} event: {} after " + NUM_RETRIES + " retries", + topicName, hCatEventMessage.getEventType(), e); } } } /** - * Get the topic object for the topicName + * Get the topic object for the topicName. * - * @param topicName The String identifying the message-topic. - * @return A {@link Topic} object corresponding to the specified topicName. - * @throws JMSException + * @param topicName The String identifying the message-topic + * + * @return A {@link Topic} object corresponding to the specified topicName */ - protected Topic createTopic(final String topicName) throws JMSException { + private Topic createTopic(String topicName) throws JMSException { return session.get().createTopic(topicName); } /** - * Does a health check on the connection by sending a dummy message. - * Create the connection if the connection is found to be bad - * Also recreates the session - */ - protected synchronized void testAndCreateConnection() { - if (conn != null) { - // This method is reached when error occurs while sending msg, so the session must be bad - session.remove(); - if (!isConnectionHealthy()) { - // I am the first thread to detect the error, cleanup old connection & reconnect - try { - conn.close(); - } catch (Exception e) { - LOG.error("Unable to close bad JMS connection, ignored error", e); - } - conn = createConnection(); - } - } else { - conn = createConnection(); - } - try { - session.set(createSession()); - } catch (JMSException e) { - LOG.error("Couldn't create JMS session, ignored the error", e); - } - } - - /** - * Create the JMS connection + * Create the JMS connection. + * * @return newly created JMS connection */ - protected Connection createConnection() { + private Connection createConnection() { LOG.info("Will create new JMS connection"); - Context jndiCntxt; Connection jmsConnection = null; try { - jndiCntxt = new InitialContext(); + Context jndiCntxt = new InitialContext(); ConnectionFactory connFac = (ConnectionFactory) jndiCntxt.lookup("ConnectionFactory"); jmsConnection = connFac.createConnection(); jmsConnection.start(); @@ -370,8 +359,8 @@ public void onException(JMSException jmse) { }); } catch (NamingException e) { LOG.error("JNDI error while setting up Message Bus connection. " - + "Please make sure file named 'jndi.properties' is in " - + "classpath and contains appropriate key-value pairs.", e); + + "Please make sure file named 'jndi.properties' is in " + + "classpath and contains appropriate key-value pairs.", e); } catch (JMSException e) { LOG.error("Failed to initialize connection to message bus", e); } catch (Throwable t) { @@ -381,10 +370,11 @@ public void onException(JMSException jmse) { } /** - * Send a dummy message to probe if the JMS connection is healthy + * Send a dummy message to probe if the JMS connection is healthy. + * * @return true if connection is healthy, false otherwise */ - protected boolean isConnectionHealthy() { + private boolean isConnectionHealthy() { try { Topic topic = createTopic(getTopicPrefix(getConf()) + "." + HEALTH_CHECK_TOPIC_SUFFIX); MessageProducer producer = createProducer(topic); @@ -397,52 +387,56 @@ protected boolean isConnectionHealthy() { } /** - * Creates a JMS session - * @return newly create JMS session - * @throws JMSException + * Creates a JMS session. + * + * @return newly created JMS session */ - protected Session createSession() throws JMSException { - // We want message to be sent when session commits, thus we run in - // transacted mode. + private Session createSession() throws JMSException { + // We want message to be sent when session commits, thus we run in transacted mode. return conn.createSession(true, Session.SESSION_TRANSACTED); } /** - * Create a JMS producer - * @param topic + * Create a JMS producer. + * * @return newly created message producer - * @throws JMSException */ - protected MessageProducer createProducer(Destination topic) throws JMSException { + private MessageProducer createProducer(Destination topic) throws JMSException { return session.get().createProducer(topic); } - @Override - protected void finalize() throws Throwable { - if (conn != null) { - try { - conn.close(); - } catch (Exception e) { - LOG.error("Couldn't close jms connection, ignored the error", e); - } - } + private String getTopicPrefix(Configuration conf) { + return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); } - @Override - public void onLoadPartitionDone(LoadPartitionDoneEvent lpde) - throws MetaException { -// TODO: Fix LoadPartitionDoneEvent. Currently, LPDE can only carry a single partition-spec. And that defeats the purpose. -// if(lpde.getStatus()) -// send(lpde.getPartitionName(),lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),HCatConstants.HCAT_PARTITION_DONE_EVENT); - } + /** + * Does a health check on the connection by sending a dummy message. + * Create the connection if the connection is found to be bad + * Also recreates the session + */ + private synchronized void testAndCreateConnection() { + if (conn == null) { + conn = createConnection(); + } else { + // This method is reached when an error occurs while sending msg, so the session must be bad + session.remove(); + if (!isConnectionHealthy()) { + // I am the first thread to detect the error, cleanup old connection & reconnect + try { + conn.close(); + } catch (Exception e) { + LOG.error("Unable to close bad JMS connection, ignored error", e); + } + conn = createConnection(); + } + } - @Override - public void onAlterPartition(AlterPartitionEvent ape) throws MetaException { - // no-op + try { + session.set(createSession()); + } catch (JMSException e) { + LOG.error("Couldn't create JMS session, ignored the error", e); + } } - @Override - public void onAlterTable(AlterTableEvent ate) throws MetaException { - // no-op - } }