Index: src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java =================================================================== --- src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java (revision 0) +++ src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java (revision 0) @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.listener; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hcatalog.common.HCatConstants; + +public class TestMsgBusConnection extends TestCase{ + + private Driver driver; + private BrokerService broker; + private MessageConsumer consumer; + + @Override + protected void setUp() throws Exception { + + super.setUp(); + broker = new BrokerService(); + // configure the broker + broker.addConnector("tcp://localhost:61616?broker.persistent=false"); + + broker.start(); + + System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.provider.url", "tcp://localhost:61616"); + connectClient(); + HiveConf hiveConf = new HiveConf(this.getClass()); + hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName()); + hiveConf.set("hive.metastore.local", "true"); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + SessionState.start(new CliSessionState(hiveConf)); + driver = new Driver(hiveConf); + } + + private void connectClient() throws JMSException{ + ConnectionFactory connFac = new ActiveMQConnectionFactory("tcp://localhost:61616"); + Connection conn = connFac.createConnection(); + conn.start(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Destination hcatTopic = session.createTopic(HCatConstants.HCAT_TOPIC); + consumer = session.createConsumer(hcatTopic); + } + + public void testConnection() throws Exception{ + + try{ + driver.run("create database testconndb"); + Message msg = consumer.receive(); + assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); + assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName()); + broker.stop(); + driver.run("drop database testconndb cascade"); + broker.start(true); + connectClient(); + driver.run("create database testconndb"); + msg = consumer.receive(); + assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); + assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName()); + driver.run("drop database testconndb cascade"); + msg = consumer.receive(); + assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); + assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName()); + } catch (NoSuchObjectException nsoe){ + nsoe.printStackTrace(System.err); + assert false; + } catch (AlreadyExistsException aee){ + aee.printStackTrace(System.err); + assert false; + } + } +} Index: src/java/org/apache/hcatalog/listener/NotificationListener.java =================================================================== --- src/java/org/apache/hcatalog/listener/NotificationListener.java (revision 1135151) +++ src/java/org/apache/hcatalog/listener/NotificationListener.java (working copy) @@ -22,11 +22,11 @@ import java.util.ArrayList; import java.util.HashMap; -import javax.jdo.PersistenceManager; -import javax.jdo.Query; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; @@ -35,16 +35,11 @@ import javax.naming.InitialContext; import javax.naming.NamingException; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.MetaStoreEventListener; -import org.apache.hadoop.hive.metastore.ObjectStore; -import org.apache.hadoop.hive.metastore.RawStore; -import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler.Command; -import org.apache.hadoop.hive.metastore.api.HiveObjectRef; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -82,26 +77,7 @@ public NotificationListener(final Configuration conf) { super(conf); - try { - Context jndiCntxt = new InitialContext(); - ConnectionFactory connFac = (ConnectionFactory)jndiCntxt.lookup("ConnectionFactory"); - conn = connFac.createConnection(); - conn.start(); - // We want message to be sent when session commits, thus we run in - // transacted mode. - session = conn.createSession(true, Session.SESSION_TRANSACTED); - - } catch (NamingException e) { - LOG.error("JNDI error while setting up Message Bus connection. " + - "Please make sure file named 'jndi.properties' is in " + - "classpath and contains appropriate key-value pairs.",e); - } - catch (JMSException e) { - LOG.error("Failed to initialize connection to message bus",e); - } - catch(Throwable t){ - LOG.error("HCAT Listener failed to load",t); - } + createConnection(); } @Override @@ -227,18 +203,25 @@ */ private void send(Serializable msgBody, String topicName, String event){ - if(null == session){ - // If we weren't able to setup the session in the constructor - // we cant send message in any case. - LOG.error("Invalid session. Failed to send message on topic: "+ - topicName + " event: "+event); - return; - } + try{ - try{ - // Topics are created on demand. If it doesn't exist on broker it will - // be created when broker receives this message. - Destination topic = session.createTopic(topicName); + Destination topic = null; + try{ + // Topics are created on demand. If it doesn't exist on broker it will + // be created when broker receives this message. + topic = session.createTopic(topicName); + } catch (IllegalStateException ise){ + // If connection is no longer valid, ise is thrown, catch it and retry. + LOG.error("Seems like connection is lost. Retrying", ise); + createConnection(); + topic = session.createTopic(topicName); + } + if (null == topic){ + // Still not successful, return from here. + LOG.error("Invalid session. Failed to send message on topic: "+ + topicName + " event: "+event); + return; + } MessageProducer producer = session.createProducer(topic); ObjectMessage msg = session.createObjectMessage(msgBody); msg.setStringProperty(HCatConstants.HCAT_EVENT, event); @@ -252,13 +235,44 @@ } } + private void createConnection(){ + + Context jndiCntxt; + try { + jndiCntxt = new InitialContext(); + ConnectionFactory connFac = (ConnectionFactory)jndiCntxt.lookup("ConnectionFactory"); + Connection conn = connFac.createConnection(); + conn.start(); + conn.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException jmse) { + LOG.error(jmse); + } + }); + // We want message to be sent when session commits, thus we run in + // transacted mode. + session = conn.createSession(true, Session.SESSION_TRANSACTED); + } catch (NamingException e) { + LOG.error("JNDI error while setting up Message Bus connection. " + + "Please make sure file named 'jndi.properties' is in " + + "classpath and contains appropriate key-value pairs.",e); + } catch (JMSException e) { + LOG.error("Failed to initialize connection to message bus",e); + } catch(Throwable t){ + LOG.error("Unable to connect to JMS provider",t); + } + } + @Override protected void finalize() throws Throwable { // Close the connection before dying. try { + if (null != session) + session.close(); if(conn != null) { conn.close(); } + } catch (Exception ignore) { LOG.info("Failed to close message bus connection.", ignore); } Index: ivy.xml =================================================================== --- ivy.xml (revision 1135151) +++ ivy.xml (working copy) @@ -44,7 +44,7 @@ conf="common->master"/> --> - +