diff --git ant/deploy.xml ant/deploy.xml index 84c3648..16e247d 100644 --- ant/deploy.xml +++ ant/deploy.xml @@ -20,13 +20,50 @@ - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git build-common.xml build-common.xml index 8740772..c5feb40 100644 --- build-common.xml +++ build-common.xml @@ -77,7 +77,7 @@ - + diff --git build.xml build.xml index b7fc8a1..8eb723f 100644 --- build.xml +++ build.xml @@ -127,12 +127,6 @@ - - - @@ -169,26 +163,12 @@ - - - - - - - - - + + @@ -197,6 +177,7 @@ + @@ -247,6 +228,7 @@ <_junit srcDir="${basedir}/src/test"/> + @@ -256,6 +238,7 @@ description="run unit tests and generate code coverage reports"> <_junit srcDir="${basedir}/src/test"/> + - - - - - - - - - - - - + diff --git server-extensions/build.xml server-extensions/build.xml new file mode 100644 index 0000000..f753ce2 --- /dev/null +++ server-extensions/build.xml @@ -0,0 +1,44 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git server-extensions/pom.xml server-extensions/pom.xml new file mode 100644 index 0000000..ee974ad --- /dev/null +++ server-extensions/pom.xml @@ -0,0 +1,83 @@ + + + + + + org.apache.hcatalog + hcatalog + ${hcatalog.version} + ../pom.xml + + + 4.0.0 + org.apache.hcatalog + hcatalog-server-extensions + jar + ${hcatalog.version} + server-extensions + http://maven.apache.org + + + + javax.jms + jms + ${jms.version} + compile + + + org.apache.activemq + activemq-core + ${activemq.version} + compile + + + org.springframework + spring-context + + + + + org.apache.activemq + kahadb + ${activemq.version} + compile + + + org.apache.hcatalog + hcatalog-core + ${hcatalog.version} + compile + + + + + junit + junit + ${junit.version} + test + + + org.apache.hadoop + hadoop-test + ${hadoop20.version} + test + + + diff --git server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java new file mode 100644 index 0000000..5c6b8ad --- /dev/null +++ server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java @@ -0,0 +1,389 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.listener; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.apache.hadoop.hive.metastore.MetaStoreEventListener; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; +import org.apache.hcatalog.common.HCatConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of + * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends + * message on two type of topics. One has name of form dbName.tblName On this + * topic, two kind of messages are sent: add/drop partition and + * finalize_partition message. Second topic has name "HCAT" and messages sent on + * it are: add/drop database and add/drop table. All messages also has a + * property named "HCAT_EVENT" set on them whose value can be used to configure + * message selector on subscriber side. + */ +public class NotificationListener extends MetaStoreEventListener { + + private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class); + protected Session session; + protected Connection conn; + + /** + * Create message bus connection and session in constructor. + */ + public NotificationListener(final Configuration conf) { + + super(conf); + createConnection(); + } + + private static String getTopicName(Partition partition, + ListenerEvent partitionEvent) throws MetaException { + try { + return partitionEvent.getHandler() + .get_table(partition.getDbName(), partition.getTableName()) + .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME); + } catch (NoSuchObjectException e) { + throw new MetaException(e.toString()); + } + } + + @Override + public void onAddPartition(AddPartitionEvent partitionEvent) + throws MetaException { + // Subscriber can get notification of newly add partition in a + // particular table by listening on a topic named "dbName.tableName" + // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" + if (partitionEvent.getStatus()) { + + Partition partition = partitionEvent.getPartition(); + String topicName = getTopicName(partition, partitionEvent); + if (topicName != null && !topicName.equals("")) { + send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT); + } else { + LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + + partition.getDbName() + + "." + + partition.getTableName() + + " To enable notifications for this table, please do alter table set properties (" + + HCatConstants.HCAT_MSGBUS_TOPIC_NAME + + "=.) or whatever you want topic name to be."); + } + } + + } + + /** + * Send dropped partition notifications. Subscribers can receive these notifications for a + * particular table by listening on a topic named "dbName.tableName" with message selector + * string {@value org.apache.hcatalog.common.HCatConstants#HCAT_EVENT} = + * {@value org.apache.hcatalog.common.HCatConstants#HCAT_DROP_PARTITION_EVENT}. + *
+ * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been + * found to throw NPE when serializing objects that contain null. For this reason we override + * some fields in the StorageDescriptor of this notification. This should be fixed after + * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved. + */ + @Override + public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { + if (partitionEvent.getStatus()) { + Partition partition = partitionEvent.getPartition(); + StorageDescriptor sd = partition.getSd(); + sd.setBucketCols(new ArrayList()); + sd.setSortCols(new ArrayList()); + sd.setParameters(new HashMap()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSkewedInfo().setSkewedColNames(new ArrayList()); + String topicName = getTopicName(partition, partitionEvent); + if (topicName != null && !topicName.equals("")) { + send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT); + } else { + LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + + partition.getDbName() + + "." + + partition.getTableName() + + " To enable notifications for this table, please do alter table set properties (" + + HCatConstants.HCAT_MSGBUS_TOPIC_NAME + + "=.) or whatever you want topic name to be."); + } + } + } + + @Override + public void onCreateDatabase(CreateDatabaseEvent dbEvent) + throws MetaException { + // Subscriber can get notification about addition of a database in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_ADD_DATABASE" + if (dbEvent.getStatus()) + send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler() + .getHiveConf()), HCatConstants.HCAT_ADD_DATABASE_EVENT); + } + + @Override + public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { + // Subscriber can get notification about drop of a database in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_DROP_DATABASE" + if (dbEvent.getStatus()) + send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler() + .getHiveConf()), HCatConstants.HCAT_DROP_DATABASE_EVENT); + } + + @Override + public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { + // Subscriber can get notification about addition of a table in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_ADD_TABLE" + if (tableEvent.getStatus()) { + Table tbl = tableEvent.getTable(); + HMSHandler handler = tableEvent.getHandler(); + HiveConf conf = handler.getHiveConf(); + Table newTbl; + try { + newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName()) + .deepCopy(); + newTbl.getParameters().put( + HCatConstants.HCAT_MSGBUS_TOPIC_NAME, + getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "." + + newTbl.getTableName().toLowerCase()); + handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl); + } catch (InvalidOperationException e) { + MetaException me = new MetaException(e.toString()); + me.initCause(e); + throw me; + } catch (NoSuchObjectException e) { + MetaException me = new MetaException(e.toString()); + me.initCause(e); + throw me; + } + send(newTbl, getTopicPrefix(conf) + "." + + newTbl.getDbName().toLowerCase(), + HCatConstants.HCAT_ADD_TABLE_EVENT); + } + } + + private String getTopicPrefix(HiveConf conf) { + return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); + } + + /** + * Send dropped table notifications. Subscribers can receive these notifications for + * dropped tables by listening on topic "HCAT" with message selector string + * {@value org.apache.hcatalog.common.HCatConstants#HCAT_EVENT} = + * {@value org.apache.hcatalog.common.HCatConstants#HCAT_DROP_TABLE_EVENT} + *
+ * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been + * found to throw NPE when serializing objects that contain null. For this reason we override + * some fields in the StorageDescriptor of this notification. This should be fixed after + * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved. + */ + @Override + public void onDropTable(DropTableEvent tableEvent) throws MetaException { + // Subscriber can get notification about drop of a table in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_DROP_TABLE" + + // Datanucleus throws NPE when we try to serialize a table object + // retrieved from metastore. To workaround that we reset following objects + + if (tableEvent.getStatus()) { + Table table = tableEvent.getTable(); + StorageDescriptor sd = table.getSd(); + sd.setBucketCols(new ArrayList()); + sd.setSortCols(new ArrayList()); + sd.setParameters(new HashMap()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSkewedInfo().setSkewedColNames(new ArrayList()); + send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + + table.getDbName().toLowerCase(), + HCatConstants.HCAT_DROP_TABLE_EVENT); + } + } + + /** + * @param msgBody + * is the metastore object. It is sent in full such that if + * subscriber is really interested in details, it can reconstruct it + * fully. In case of finalize_partition message this will be string + * specification of the partition. + * @param topicName + * is the name on message broker on which message is sent. + * @param event + * is the value of HCAT_EVENT property in message. It can be used to + * select messages in client side. + */ + protected void send(Object msgBody, String topicName, String event) { + + try { + + Destination topic = null; + if (null == session) { + // this will happen, if we never able to establish a connection. + createConnection(); + if (null == session) { + // Still not successful, return from here. + LOG.error("Invalid session. Failed to send message on topic: " + + topicName + " event: " + event); + return; + } + } + try { + // Topics are created on demand. If it doesn't exist on broker it will + // be created when broker receives this message. + topic = session.createTopic(topicName); + } catch (IllegalStateException ise) { + // this will happen if we were able to establish connection once, but + // its no longer valid, + // ise is thrown, catch it and retry. + LOG.error("Seems like connection is lost. Retrying", ise); + createConnection(); + topic = session.createTopic(topicName); + } + if (null == topic) { + // Still not successful, return from here. + LOG.error("Invalid session. Failed to send message on topic: " + + topicName + " event: " + event); + return; + } + MessageProducer producer = session.createProducer(topic); + Message msg; + if (msgBody instanceof Map) { + MapMessage mapMsg = session.createMapMessage(); + Map incomingMap = (Map) msgBody; + for (Entry partCol : incomingMap.entrySet()) { + mapMsg.setString(partCol.getKey(), partCol.getValue()); + } + msg = mapMsg; + } else { + msg = session.createObjectMessage((Serializable) msgBody); + } + + msg.setStringProperty(HCatConstants.HCAT_EVENT, event); + producer.send(msg); + // Message must be transacted before we return. + session.commit(); + } catch (Exception e) { + // Gobble up the exception. Message delivery is best effort. + LOG.error("Failed to send message on topic: " + topicName + " event: " + + event, e); + } + } + + protected void createConnection() { + + Context jndiCntxt; + try { + jndiCntxt = new InitialContext(); + ConnectionFactory connFac = (ConnectionFactory) jndiCntxt + .lookup("ConnectionFactory"); + Connection conn = connFac.createConnection(); + conn.start(); + conn.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException jmse) { + LOG.error(jmse.toString()); + } + }); + // We want message to be sent when session commits, thus we run in + // transacted mode. + session = conn.createSession(true, Session.SESSION_TRANSACTED); + } catch (NamingException e) { + LOG.error("JNDI error while setting up Message Bus connection. " + + "Please make sure file named 'jndi.properties' is in " + + "classpath and contains appropriate key-value pairs.", e); + } catch (JMSException e) { + LOG.error("Failed to initialize connection to message bus", e); + } catch (Throwable t) { + LOG.error("Unable to connect to JMS provider", t); + } + } + + @Override + protected void finalize() throws Throwable { + // Close the connection before dying. + try { + if (null != session) + session.close(); + if (conn != null) { + conn.close(); + } + + } catch (Exception ignore) { + LOG.info("Failed to close message bus connection.", ignore); + } + } + + @Override + public void onLoadPartitionDone(LoadPartitionDoneEvent lpde) + throws MetaException { + if (lpde.getStatus()) + send( + lpde.getPartitionName(), + lpde.getTable().getParameters() + .get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME), + HCatConstants.HCAT_PARTITION_DONE_EVENT); + } + + @Override + public void onAlterPartition(AlterPartitionEvent ape) throws MetaException { + // no-op + } + + @Override + public void onAlterTable(AlterTableEvent ate) throws MetaException { + // no-op + } +} diff --git server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java new file mode 100644 index 0000000..0db2b16 --- /dev/null +++ server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.listener; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hcatalog.common.HCatConstants; + +public class TestMsgBusConnection extends TestCase { + + private Driver driver; + private BrokerService broker; + private MessageConsumer consumer; + + @Override + protected void setUp() throws Exception { + + super.setUp(); + broker = new BrokerService(); + // configure the broker + broker.addConnector("tcp://localhost:61616?broker.persistent=false"); + + broker.start(); + + System.setProperty("java.naming.factory.initial", + "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.provider.url", "tcp://localhost:61616"); + connectClient(); + HiveConf hiveConf = new HiveConf(this.getClass()); + hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, + NotificationListener.class.getName()); + hiveConf.set("hive.metastore.local", "true"); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat"); + SessionState.start(new CliSessionState(hiveConf)); + driver = new Driver(hiveConf); + } + + private void connectClient() throws JMSException { + ConnectionFactory connFac = new ActiveMQConnectionFactory( + "tcp://localhost:61616"); + Connection conn = connFac.createConnection(); + conn.start(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Destination hcatTopic = session.createTopic("planetlab.hcat"); + consumer = session.createConsumer(hcatTopic); + } + + public void testConnection() throws Exception { + + try { + driver.run("create database testconndb"); + Message msg = consumer.receive(); + assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, + msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); + assertEquals("testconndb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + broker.stop(); + driver.run("drop database testconndb cascade"); + broker.start(true); + connectClient(); + driver.run("create database testconndb"); + msg = consumer.receive(); + assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, + msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); + assertEquals("testconndb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + driver.run("drop database testconndb cascade"); + msg = consumer.receive(); + assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, + msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); + assertEquals("testconndb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + } catch (NoSuchObjectException nsoe) { + nsoe.printStackTrace(System.err); + assert false; + } catch (AlreadyExistsException aee) { + aee.printStackTrace(System.err); + assert false; + } + } +} diff --git server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java new file mode 100644 index 0000000..9a66427 --- /dev/null +++ server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.listener; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionEventType; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.mapreduce.HCatBaseTest; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestNotificationListener extends HCatBaseTest implements MessageListener { + + private List actualMessages = new ArrayList(); + + @Before + public void setUp() throws Exception { + System.setProperty("java.naming.factory.initial", + "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.provider.url", + "vm://localhost?broker.persistent=false"); + ConnectionFactory connFac = new ActiveMQConnectionFactory( + "vm://localhost?broker.persistent=false"); + Connection conn = connFac.createConnection(); + conn.start(); + // We want message to be sent when session commits, thus we run in + // transacted mode. + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Destination hcatTopic = session + .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); + MessageConsumer consumer1 = session.createConsumer(hcatTopic); + consumer1.setMessageListener(this); + Destination tblTopic = session + .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb.mytbl"); + MessageConsumer consumer2 = session.createConsumer(tblTopic); + consumer2.setMessageListener(this); + Destination dbTopic = session + .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb"); + MessageConsumer consumer3 = session.createConsumer(dbTopic); + consumer3.setMessageListener(this); + + setUpHiveConf(); + hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, + NotificationListener.class.getName()); + SessionState.start(new CliSessionState(hiveConf)); + driver = new Driver(hiveConf); + client = new HiveMetaStoreClient(hiveConf); + } + + @After + public void tearDown() throws Exception { + List expectedMessages = Arrays.asList( + HCatConstants.HCAT_ADD_DATABASE_EVENT, + HCatConstants.HCAT_ADD_TABLE_EVENT, + HCatConstants.HCAT_ADD_PARTITION_EVENT, + HCatConstants.HCAT_PARTITION_DONE_EVENT, + HCatConstants.HCAT_DROP_PARTITION_EVENT, + HCatConstants.HCAT_DROP_TABLE_EVENT, + HCatConstants.HCAT_DROP_DATABASE_EVENT); + Assert.assertEquals(expectedMessages, actualMessages); + } + + @Test + public void testAMQListener() throws Exception { + driver.run("create database mydb"); + driver.run("use mydb"); + driver.run("create table mytbl (a string) partitioned by (b string)"); + driver.run("alter table mytbl add partition(b='2011')"); + Map kvs = new HashMap(1); + kvs.put("b", "2011"); + client.markPartitionForEvent("mydb", "mytbl", kvs, + PartitionEventType.LOAD_DONE); + driver.run("alter table mytbl drop partition(b='2011')"); + driver.run("drop table mytbl"); + driver.run("drop database mydb"); + } + + @Override + public void onMessage(Message msg) { + String event; + try { + event = msg.getStringProperty(HCatConstants.HCAT_EVENT); + actualMessages.add(event); + + if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) { + + Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg + .getJMSDestination().toString()); + Assert.assertEquals("mydb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) { + + Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); + Table tbl = (Table) (((ObjectMessage) msg).getObject()); + Assert.assertEquals("mytbl", tbl.getTableName()); + Assert.assertEquals("mydb", tbl.getDbName()); + Assert.assertEquals(1, tbl.getPartitionKeysSize()); + } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) { + + Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + .toString()); + Partition part = (Partition) (((ObjectMessage) msg).getObject()); + Assert.assertEquals("mytbl", part.getTableName()); + Assert.assertEquals("mydb", part.getDbName()); + List vals = new ArrayList(1); + vals.add("2011"); + Assert.assertEquals(vals, part.getValues()); + } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { + + Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + .toString()); + Partition part = (Partition) (((ObjectMessage) msg).getObject()); + Assert.assertEquals("mytbl", part.getTableName()); + Assert.assertEquals("mydb", part.getDbName()); + List vals = new ArrayList(1); + vals.add("2011"); + Assert.assertEquals(vals, part.getValues()); + } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) { + + Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); + Table tbl = (Table) (((ObjectMessage) msg).getObject()); + Assert.assertEquals("mytbl", tbl.getTableName()); + Assert.assertEquals("mydb", tbl.getDbName()); + Assert.assertEquals(1, tbl.getPartitionKeysSize()); + } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) { + + Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg + .getJMSDestination().toString()); + Assert.assertEquals("mydb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) { + Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + .toString()); + MapMessage mapMsg = (MapMessage) msg; + assert mapMsg.getString("b").equals("2011"); + } else + assert false; + } catch (JMSException e) { + e.printStackTrace(System.err); + assert false; + } + } +} diff --git src/java/org/apache/hcatalog/listener/NotificationListener.java src/java/org/apache/hcatalog/listener/NotificationListener.java deleted file mode 100644 index 5c6b8ad..0000000 --- src/java/org/apache/hcatalog/listener/NotificationListener.java +++ /dev/null @@ -1,389 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hcatalog.listener; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; -import org.apache.hadoop.hive.metastore.MetaStoreEventListener; -import org.apache.hadoop.hive.metastore.api.InvalidOperationException; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; -import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; -import org.apache.hadoop.hive.metastore.events.AlterTableEvent; -import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; -import org.apache.hadoop.hive.metastore.events.CreateTableEvent; -import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; -import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; -import org.apache.hadoop.hive.metastore.events.DropTableEvent; -import org.apache.hadoop.hive.metastore.events.ListenerEvent; -import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; -import org.apache.hcatalog.common.HCatConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implementation of - * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends - * message on two type of topics. One has name of form dbName.tblName On this - * topic, two kind of messages are sent: add/drop partition and - * finalize_partition message. Second topic has name "HCAT" and messages sent on - * it are: add/drop database and add/drop table. All messages also has a - * property named "HCAT_EVENT" set on them whose value can be used to configure - * message selector on subscriber side. - */ -public class NotificationListener extends MetaStoreEventListener { - - private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class); - protected Session session; - protected Connection conn; - - /** - * Create message bus connection and session in constructor. - */ - public NotificationListener(final Configuration conf) { - - super(conf); - createConnection(); - } - - private static String getTopicName(Partition partition, - ListenerEvent partitionEvent) throws MetaException { - try { - return partitionEvent.getHandler() - .get_table(partition.getDbName(), partition.getTableName()) - .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME); - } catch (NoSuchObjectException e) { - throw new MetaException(e.toString()); - } - } - - @Override - public void onAddPartition(AddPartitionEvent partitionEvent) - throws MetaException { - // Subscriber can get notification of newly add partition in a - // particular table by listening on a topic named "dbName.tableName" - // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" - if (partitionEvent.getStatus()) { - - Partition partition = partitionEvent.getPartition(); - String topicName = getTopicName(partition, partitionEvent); - if (topicName != null && !topicName.equals("")) { - send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT); - } else { - LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " - + partition.getDbName() - + "." - + partition.getTableName() - + " To enable notifications for this table, please do alter table set properties (" - + HCatConstants.HCAT_MSGBUS_TOPIC_NAME - + "=.) or whatever you want topic name to be."); - } - } - - } - - /** - * Send dropped partition notifications. Subscribers can receive these notifications for a - * particular table by listening on a topic named "dbName.tableName" with message selector - * string {@value org.apache.hcatalog.common.HCatConstants#HCAT_EVENT} = - * {@value org.apache.hcatalog.common.HCatConstants#HCAT_DROP_PARTITION_EVENT}. - *
- * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been - * found to throw NPE when serializing objects that contain null. For this reason we override - * some fields in the StorageDescriptor of this notification. This should be fixed after - * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved. - */ - @Override - public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { - if (partitionEvent.getStatus()) { - Partition partition = partitionEvent.getPartition(); - StorageDescriptor sd = partition.getSd(); - sd.setBucketCols(new ArrayList()); - sd.setSortCols(new ArrayList()); - sd.setParameters(new HashMap()); - sd.getSerdeInfo().setParameters(new HashMap()); - sd.getSkewedInfo().setSkewedColNames(new ArrayList()); - String topicName = getTopicName(partition, partitionEvent); - if (topicName != null && !topicName.equals("")) { - send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT); - } else { - LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " - + partition.getDbName() - + "." - + partition.getTableName() - + " To enable notifications for this table, please do alter table set properties (" - + HCatConstants.HCAT_MSGBUS_TOPIC_NAME - + "=.) or whatever you want topic name to be."); - } - } - } - - @Override - public void onCreateDatabase(CreateDatabaseEvent dbEvent) - throws MetaException { - // Subscriber can get notification about addition of a database in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_ADD_DATABASE" - if (dbEvent.getStatus()) - send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler() - .getHiveConf()), HCatConstants.HCAT_ADD_DATABASE_EVENT); - } - - @Override - public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { - // Subscriber can get notification about drop of a database in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_DROP_DATABASE" - if (dbEvent.getStatus()) - send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler() - .getHiveConf()), HCatConstants.HCAT_DROP_DATABASE_EVENT); - } - - @Override - public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { - // Subscriber can get notification about addition of a table in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_ADD_TABLE" - if (tableEvent.getStatus()) { - Table tbl = tableEvent.getTable(); - HMSHandler handler = tableEvent.getHandler(); - HiveConf conf = handler.getHiveConf(); - Table newTbl; - try { - newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName()) - .deepCopy(); - newTbl.getParameters().put( - HCatConstants.HCAT_MSGBUS_TOPIC_NAME, - getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "." - + newTbl.getTableName().toLowerCase()); - handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl); - } catch (InvalidOperationException e) { - MetaException me = new MetaException(e.toString()); - me.initCause(e); - throw me; - } catch (NoSuchObjectException e) { - MetaException me = new MetaException(e.toString()); - me.initCause(e); - throw me; - } - send(newTbl, getTopicPrefix(conf) + "." - + newTbl.getDbName().toLowerCase(), - HCatConstants.HCAT_ADD_TABLE_EVENT); - } - } - - private String getTopicPrefix(HiveConf conf) { - return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, - HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); - } - - /** - * Send dropped table notifications. Subscribers can receive these notifications for - * dropped tables by listening on topic "HCAT" with message selector string - * {@value org.apache.hcatalog.common.HCatConstants#HCAT_EVENT} = - * {@value org.apache.hcatalog.common.HCatConstants#HCAT_DROP_TABLE_EVENT} - *
- * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been - * found to throw NPE when serializing objects that contain null. For this reason we override - * some fields in the StorageDescriptor of this notification. This should be fixed after - * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved. - */ - @Override - public void onDropTable(DropTableEvent tableEvent) throws MetaException { - // Subscriber can get notification about drop of a table in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_DROP_TABLE" - - // Datanucleus throws NPE when we try to serialize a table object - // retrieved from metastore. To workaround that we reset following objects - - if (tableEvent.getStatus()) { - Table table = tableEvent.getTable(); - StorageDescriptor sd = table.getSd(); - sd.setBucketCols(new ArrayList()); - sd.setSortCols(new ArrayList()); - sd.setParameters(new HashMap()); - sd.getSerdeInfo().setParameters(new HashMap()); - sd.getSkewedInfo().setSkewedColNames(new ArrayList()); - send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." - + table.getDbName().toLowerCase(), - HCatConstants.HCAT_DROP_TABLE_EVENT); - } - } - - /** - * @param msgBody - * is the metastore object. It is sent in full such that if - * subscriber is really interested in details, it can reconstruct it - * fully. In case of finalize_partition message this will be string - * specification of the partition. - * @param topicName - * is the name on message broker on which message is sent. - * @param event - * is the value of HCAT_EVENT property in message. It can be used to - * select messages in client side. - */ - protected void send(Object msgBody, String topicName, String event) { - - try { - - Destination topic = null; - if (null == session) { - // this will happen, if we never able to establish a connection. - createConnection(); - if (null == session) { - // Still not successful, return from here. - LOG.error("Invalid session. Failed to send message on topic: " - + topicName + " event: " + event); - return; - } - } - try { - // Topics are created on demand. If it doesn't exist on broker it will - // be created when broker receives this message. - topic = session.createTopic(topicName); - } catch (IllegalStateException ise) { - // this will happen if we were able to establish connection once, but - // its no longer valid, - // ise is thrown, catch it and retry. - LOG.error("Seems like connection is lost. Retrying", ise); - createConnection(); - topic = session.createTopic(topicName); - } - if (null == topic) { - // Still not successful, return from here. - LOG.error("Invalid session. Failed to send message on topic: " - + topicName + " event: " + event); - return; - } - MessageProducer producer = session.createProducer(topic); - Message msg; - if (msgBody instanceof Map) { - MapMessage mapMsg = session.createMapMessage(); - Map incomingMap = (Map) msgBody; - for (Entry partCol : incomingMap.entrySet()) { - mapMsg.setString(partCol.getKey(), partCol.getValue()); - } - msg = mapMsg; - } else { - msg = session.createObjectMessage((Serializable) msgBody); - } - - msg.setStringProperty(HCatConstants.HCAT_EVENT, event); - producer.send(msg); - // Message must be transacted before we return. - session.commit(); - } catch (Exception e) { - // Gobble up the exception. Message delivery is best effort. - LOG.error("Failed to send message on topic: " + topicName + " event: " - + event, e); - } - } - - protected void createConnection() { - - Context jndiCntxt; - try { - jndiCntxt = new InitialContext(); - ConnectionFactory connFac = (ConnectionFactory) jndiCntxt - .lookup("ConnectionFactory"); - Connection conn = connFac.createConnection(); - conn.start(); - conn.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException jmse) { - LOG.error(jmse.toString()); - } - }); - // We want message to be sent when session commits, thus we run in - // transacted mode. - session = conn.createSession(true, Session.SESSION_TRANSACTED); - } catch (NamingException e) { - LOG.error("JNDI error while setting up Message Bus connection. " - + "Please make sure file named 'jndi.properties' is in " - + "classpath and contains appropriate key-value pairs.", e); - } catch (JMSException e) { - LOG.error("Failed to initialize connection to message bus", e); - } catch (Throwable t) { - LOG.error("Unable to connect to JMS provider", t); - } - } - - @Override - protected void finalize() throws Throwable { - // Close the connection before dying. - try { - if (null != session) - session.close(); - if (conn != null) { - conn.close(); - } - - } catch (Exception ignore) { - LOG.info("Failed to close message bus connection.", ignore); - } - } - - @Override - public void onLoadPartitionDone(LoadPartitionDoneEvent lpde) - throws MetaException { - if (lpde.getStatus()) - send( - lpde.getPartitionName(), - lpde.getTable().getParameters() - .get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME), - HCatConstants.HCAT_PARTITION_DONE_EVENT); - } - - @Override - public void onAlterPartition(AlterPartitionEvent ape) throws MetaException { - // no-op - } - - @Override - public void onAlterTable(AlterTableEvent ate) throws MetaException { - // no-op - } -} diff --git src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java deleted file mode 100644 index 0db2b16..0000000 --- src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hcatalog.listener; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.ObjectMessage; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hcatalog.common.HCatConstants; - -public class TestMsgBusConnection extends TestCase { - - private Driver driver; - private BrokerService broker; - private MessageConsumer consumer; - - @Override - protected void setUp() throws Exception { - - super.setUp(); - broker = new BrokerService(); - // configure the broker - broker.addConnector("tcp://localhost:61616?broker.persistent=false"); - - broker.start(); - - System.setProperty("java.naming.factory.initial", - "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); - System.setProperty("java.naming.provider.url", "tcp://localhost:61616"); - connectClient(); - HiveConf hiveConf = new HiveConf(this.getClass()); - hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, - NotificationListener.class.getName()); - hiveConf.set("hive.metastore.local", "true"); - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat"); - SessionState.start(new CliSessionState(hiveConf)); - driver = new Driver(hiveConf); - } - - private void connectClient() throws JMSException { - ConnectionFactory connFac = new ActiveMQConnectionFactory( - "tcp://localhost:61616"); - Connection conn = connFac.createConnection(); - conn.start(); - Session session = conn.createSession(true, Session.SESSION_TRANSACTED); - Destination hcatTopic = session.createTopic("planetlab.hcat"); - consumer = session.createConsumer(hcatTopic); - } - - public void testConnection() throws Exception { - - try { - driver.run("create database testconndb"); - Message msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, - msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); - assertEquals("testconndb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); - broker.stop(); - driver.run("drop database testconndb cascade"); - broker.start(true); - connectClient(); - driver.run("create database testconndb"); - msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, - msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); - assertEquals("testconndb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); - driver.run("drop database testconndb cascade"); - msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, - msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); - assertEquals("testconndb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); - } catch (NoSuchObjectException nsoe) { - nsoe.printStackTrace(System.err); - assert false; - } catch (AlreadyExistsException aee) { - aee.printStackTrace(System.err); - assert false; - } - } -} diff --git src/test/org/apache/hcatalog/listener/TestNotificationListener.java src/test/org/apache/hcatalog/listener/TestNotificationListener.java deleted file mode 100644 index 9a66427..0000000 --- src/test/org/apache/hcatalog/listener/TestNotificationListener.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hcatalog.listener; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.ObjectMessage; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.PartitionEventType; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hcatalog.common.HCatConstants; -import org.apache.hcatalog.mapreduce.HCatBaseTest; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class TestNotificationListener extends HCatBaseTest implements MessageListener { - - private List actualMessages = new ArrayList(); - - @Before - public void setUp() throws Exception { - System.setProperty("java.naming.factory.initial", - "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); - System.setProperty("java.naming.provider.url", - "vm://localhost?broker.persistent=false"); - ConnectionFactory connFac = new ActiveMQConnectionFactory( - "vm://localhost?broker.persistent=false"); - Connection conn = connFac.createConnection(); - conn.start(); - // We want message to be sent when session commits, thus we run in - // transacted mode. - Session session = conn.createSession(true, Session.SESSION_TRANSACTED); - Destination hcatTopic = session - .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); - MessageConsumer consumer1 = session.createConsumer(hcatTopic); - consumer1.setMessageListener(this); - Destination tblTopic = session - .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb.mytbl"); - MessageConsumer consumer2 = session.createConsumer(tblTopic); - consumer2.setMessageListener(this); - Destination dbTopic = session - .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb"); - MessageConsumer consumer3 = session.createConsumer(dbTopic); - consumer3.setMessageListener(this); - - setUpHiveConf(); - hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, - NotificationListener.class.getName()); - SessionState.start(new CliSessionState(hiveConf)); - driver = new Driver(hiveConf); - client = new HiveMetaStoreClient(hiveConf); - } - - @After - public void tearDown() throws Exception { - List expectedMessages = Arrays.asList( - HCatConstants.HCAT_ADD_DATABASE_EVENT, - HCatConstants.HCAT_ADD_TABLE_EVENT, - HCatConstants.HCAT_ADD_PARTITION_EVENT, - HCatConstants.HCAT_PARTITION_DONE_EVENT, - HCatConstants.HCAT_DROP_PARTITION_EVENT, - HCatConstants.HCAT_DROP_TABLE_EVENT, - HCatConstants.HCAT_DROP_DATABASE_EVENT); - Assert.assertEquals(expectedMessages, actualMessages); - } - - @Test - public void testAMQListener() throws Exception { - driver.run("create database mydb"); - driver.run("use mydb"); - driver.run("create table mytbl (a string) partitioned by (b string)"); - driver.run("alter table mytbl add partition(b='2011')"); - Map kvs = new HashMap(1); - kvs.put("b", "2011"); - client.markPartitionForEvent("mydb", "mytbl", kvs, - PartitionEventType.LOAD_DONE); - driver.run("alter table mytbl drop partition(b='2011')"); - driver.run("drop table mytbl"); - driver.run("drop database mydb"); - } - - @Override - public void onMessage(Message msg) { - String event; - try { - event = msg.getStringProperty(HCatConstants.HCAT_EVENT); - actualMessages.add(event); - - if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) { - - Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg - .getJMSDestination().toString()); - Assert.assertEquals("mydb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); - } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) { - - Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); - Table tbl = (Table) (((ObjectMessage) msg).getObject()); - Assert.assertEquals("mytbl", tbl.getTableName()); - Assert.assertEquals("mydb", tbl.getDbName()); - Assert.assertEquals(1, tbl.getPartitionKeysSize()); - } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) { - - Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() - .toString()); - Partition part = (Partition) (((ObjectMessage) msg).getObject()); - Assert.assertEquals("mytbl", part.getTableName()); - Assert.assertEquals("mydb", part.getDbName()); - List vals = new ArrayList(1); - vals.add("2011"); - Assert.assertEquals(vals, part.getValues()); - } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { - - Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() - .toString()); - Partition part = (Partition) (((ObjectMessage) msg).getObject()); - Assert.assertEquals("mytbl", part.getTableName()); - Assert.assertEquals("mydb", part.getDbName()); - List vals = new ArrayList(1); - vals.add("2011"); - Assert.assertEquals(vals, part.getValues()); - } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) { - - Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); - Table tbl = (Table) (((ObjectMessage) msg).getObject()); - Assert.assertEquals("mytbl", tbl.getTableName()); - Assert.assertEquals("mydb", tbl.getDbName()); - Assert.assertEquals(1, tbl.getPartitionKeysSize()); - } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) { - - Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg - .getJMSDestination().toString()); - Assert.assertEquals("mydb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); - } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) { - Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() - .toString()); - MapMessage mapMsg = (MapMessage) msg; - assert mapMsg.getString("b").equals("2011"); - } else - assert false; - } catch (JMSException e) { - e.printStackTrace(System.err); - assert false; - } - } -} diff --git webhcat/java-client/build.xml webhcat/java-client/build.xml index fc1386c..eaa5347 100644 --- webhcat/java-client/build.xml +++ webhcat/java-client/build.xml @@ -18,6 +18,7 @@ + diff --git webhcat/svr/build.xml webhcat/svr/build.xml index a7bc34c..17fd93d 100644 --- webhcat/svr/build.xml +++ webhcat/svr/build.xml @@ -18,6 +18,7 @@ +