diff --git ant/deploy.xml ant/deploy.xml
index 84c3648..16e247d 100644
--- ant/deploy.xml
+++ ant/deploy.xml
@@ -20,13 +20,50 @@
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git build-common.xml build-common.xml
index 8740772..c5feb40 100644
--- build-common.xml
+++ build-common.xml
@@ -77,7 +77,7 @@
-
+
diff --git build.xml build.xml
index b7fc8a1..8eb723f 100644
--- build.xml
+++ build.xml
@@ -127,12 +127,6 @@
-
-
-
@@ -169,26 +163,12 @@
-
-
-
-
-
-
-
-
-
+
+
@@ -197,6 +177,7 @@
+
@@ -247,6 +228,7 @@
<_junit srcDir="${basedir}/src/test"/>
+
@@ -256,6 +238,7 @@
description="run unit tests and generate code coverage reports">
<_junit srcDir="${basedir}/src/test"/>
+
-
-
-
-
-
-
-
-
-
-
-
-
+
diff --git server-extensions/build.xml server-extensions/build.xml
new file mode 100644
index 0000000..f753ce2
--- /dev/null
+++ server-extensions/build.xml
@@ -0,0 +1,44 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git server-extensions/pom.xml server-extensions/pom.xml
new file mode 100644
index 0000000..ee974ad
--- /dev/null
+++ server-extensions/pom.xml
@@ -0,0 +1,83 @@
+
+
+
+
+
+ org.apache.hcatalog
+ hcatalog
+ ${hcatalog.version}
+ ../pom.xml
+
+
+ 4.0.0
+ org.apache.hcatalog
+ hcatalog-server-extensions
+ jar
+ ${hcatalog.version}
+ server-extensions
+ http://maven.apache.org
+
+
+
+ javax.jms
+ jms
+ ${jms.version}
+ compile
+
+
+ org.apache.activemq
+ activemq-core
+ ${activemq.version}
+ compile
+
+
+ org.springframework
+ spring-context
+
+
+
+
+ org.apache.activemq
+ kahadb
+ ${activemq.version}
+ compile
+
+
+ org.apache.hcatalog
+ hcatalog-core
+ ${hcatalog.version}
+ compile
+
+
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+ org.apache.hadoop
+ hadoop-test
+ ${hadoop20.version}
+ test
+
+
+
diff --git server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
new file mode 100644
index 0000000..5c6b8ad
--- /dev/null
+++ server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.listener;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
+import org.apache.hcatalog.common.HCatConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of
+ * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends
+ * message on two type of topics. One has name of form dbName.tblName On this
+ * topic, two kind of messages are sent: add/drop partition and
+ * finalize_partition message. Second topic has name "HCAT" and messages sent on
+ * it are: add/drop database and add/drop table. All messages also has a
+ * property named "HCAT_EVENT" set on them whose value can be used to configure
+ * message selector on subscriber side.
+ */
+public class NotificationListener extends MetaStoreEventListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
+ protected Session session;
+ protected Connection conn;
+
+ /**
+ * Create message bus connection and session in constructor.
+ */
+ public NotificationListener(final Configuration conf) {
+
+ super(conf);
+ createConnection();
+ }
+
+ private static String getTopicName(Partition partition,
+ ListenerEvent partitionEvent) throws MetaException {
+ try {
+ return partitionEvent.getHandler()
+ .get_table(partition.getDbName(), partition.getTableName())
+ .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
+ } catch (NoSuchObjectException e) {
+ throw new MetaException(e.toString());
+ }
+ }
+
+ @Override
+ public void onAddPartition(AddPartitionEvent partitionEvent)
+ throws MetaException {
+ // Subscriber can get notification of newly add partition in a
+ // particular table by listening on a topic named "dbName.tableName"
+ // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION"
+ if (partitionEvent.getStatus()) {
+
+ Partition partition = partitionEvent.getPartition();
+ String topicName = getTopicName(partition, partitionEvent);
+ if (topicName != null && !topicName.equals("")) {
+ send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT);
+ } else {
+ LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
+ + partition.getDbName()
+ + "."
+ + partition.getTableName()
+ + " To enable notifications for this table, please do alter table set properties ("
+ + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
+ + "=.) or whatever you want topic name to be.");
+ }
+ }
+
+ }
+
+ /**
+ * Send dropped partition notifications. Subscribers can receive these notifications for a
+ * particular table by listening on a topic named "dbName.tableName" with message selector
+ * string {@value org.apache.hcatalog.common.HCatConstants#HCAT_EVENT} =
+ * {@value org.apache.hcatalog.common.HCatConstants#HCAT_DROP_PARTITION_EVENT}.
+ *
+ * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been
+ * found to throw NPE when serializing objects that contain null. For this reason we override
+ * some fields in the StorageDescriptor of this notification. This should be fixed after
+ * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved.
+ */
+ @Override
+ public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
+ if (partitionEvent.getStatus()) {
+ Partition partition = partitionEvent.getPartition();
+ StorageDescriptor sd = partition.getSd();
+ sd.setBucketCols(new ArrayList());
+ sd.setSortCols(new ArrayList());
+ sd.setParameters(new HashMap());
+ sd.getSerdeInfo().setParameters(new HashMap());
+ sd.getSkewedInfo().setSkewedColNames(new ArrayList());
+ String topicName = getTopicName(partition, partitionEvent);
+ if (topicName != null && !topicName.equals("")) {
+ send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT);
+ } else {
+ LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
+ + partition.getDbName()
+ + "."
+ + partition.getTableName()
+ + " To enable notifications for this table, please do alter table set properties ("
+ + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
+ + "=.) or whatever you want topic name to be.");
+ }
+ }
+ }
+
+ @Override
+ public void onCreateDatabase(CreateDatabaseEvent dbEvent)
+ throws MetaException {
+ // Subscriber can get notification about addition of a database in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_ADD_DATABASE"
+ if (dbEvent.getStatus())
+ send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
+ .getHiveConf()), HCatConstants.HCAT_ADD_DATABASE_EVENT);
+ }
+
+ @Override
+ public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
+ // Subscriber can get notification about drop of a database in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_DROP_DATABASE"
+ if (dbEvent.getStatus())
+ send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
+ .getHiveConf()), HCatConstants.HCAT_DROP_DATABASE_EVENT);
+ }
+
+ @Override
+ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
+ // Subscriber can get notification about addition of a table in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_ADD_TABLE"
+ if (tableEvent.getStatus()) {
+ Table tbl = tableEvent.getTable();
+ HMSHandler handler = tableEvent.getHandler();
+ HiveConf conf = handler.getHiveConf();
+ Table newTbl;
+ try {
+ newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName())
+ .deepCopy();
+ newTbl.getParameters().put(
+ HCatConstants.HCAT_MSGBUS_TOPIC_NAME,
+ getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "."
+ + newTbl.getTableName().toLowerCase());
+ handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl);
+ } catch (InvalidOperationException e) {
+ MetaException me = new MetaException(e.toString());
+ me.initCause(e);
+ throw me;
+ } catch (NoSuchObjectException e) {
+ MetaException me = new MetaException(e.toString());
+ me.initCause(e);
+ throw me;
+ }
+ send(newTbl, getTopicPrefix(conf) + "."
+ + newTbl.getDbName().toLowerCase(),
+ HCatConstants.HCAT_ADD_TABLE_EVENT);
+ }
+ }
+
+ private String getTopicPrefix(HiveConf conf) {
+ return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
+ HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
+ }
+
+ /**
+ * Send dropped table notifications. Subscribers can receive these notifications for
+ * dropped tables by listening on topic "HCAT" with message selector string
+ * {@value org.apache.hcatalog.common.HCatConstants#HCAT_EVENT} =
+ * {@value org.apache.hcatalog.common.HCatConstants#HCAT_DROP_TABLE_EVENT}
+ *
+ * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been
+ * found to throw NPE when serializing objects that contain null. For this reason we override
+ * some fields in the StorageDescriptor of this notification. This should be fixed after
+ * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved.
+ */
+ @Override
+ public void onDropTable(DropTableEvent tableEvent) throws MetaException {
+ // Subscriber can get notification about drop of a table in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_DROP_TABLE"
+
+ // Datanucleus throws NPE when we try to serialize a table object
+ // retrieved from metastore. To workaround that we reset following objects
+
+ if (tableEvent.getStatus()) {
+ Table table = tableEvent.getTable();
+ StorageDescriptor sd = table.getSd();
+ sd.setBucketCols(new ArrayList());
+ sd.setSortCols(new ArrayList());
+ sd.setParameters(new HashMap());
+ sd.getSerdeInfo().setParameters(new HashMap());
+ sd.getSkewedInfo().setSkewedColNames(new ArrayList());
+ send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "."
+ + table.getDbName().toLowerCase(),
+ HCatConstants.HCAT_DROP_TABLE_EVENT);
+ }
+ }
+
+ /**
+ * @param msgBody
+ * is the metastore object. It is sent in full such that if
+ * subscriber is really interested in details, it can reconstruct it
+ * fully. In case of finalize_partition message this will be string
+ * specification of the partition.
+ * @param topicName
+ * is the name on message broker on which message is sent.
+ * @param event
+ * is the value of HCAT_EVENT property in message. It can be used to
+ * select messages in client side.
+ */
+ protected void send(Object msgBody, String topicName, String event) {
+
+ try {
+
+ Destination topic = null;
+ if (null == session) {
+ // this will happen, if we never able to establish a connection.
+ createConnection();
+ if (null == session) {
+ // Still not successful, return from here.
+ LOG.error("Invalid session. Failed to send message on topic: "
+ + topicName + " event: " + event);
+ return;
+ }
+ }
+ try {
+ // Topics are created on demand. If it doesn't exist on broker it will
+ // be created when broker receives this message.
+ topic = session.createTopic(topicName);
+ } catch (IllegalStateException ise) {
+ // this will happen if we were able to establish connection once, but
+ // its no longer valid,
+ // ise is thrown, catch it and retry.
+ LOG.error("Seems like connection is lost. Retrying", ise);
+ createConnection();
+ topic = session.createTopic(topicName);
+ }
+ if (null == topic) {
+ // Still not successful, return from here.
+ LOG.error("Invalid session. Failed to send message on topic: "
+ + topicName + " event: " + event);
+ return;
+ }
+ MessageProducer producer = session.createProducer(topic);
+ Message msg;
+ if (msgBody instanceof Map) {
+ MapMessage mapMsg = session.createMapMessage();
+ Map incomingMap = (Map) msgBody;
+ for (Entry partCol : incomingMap.entrySet()) {
+ mapMsg.setString(partCol.getKey(), partCol.getValue());
+ }
+ msg = mapMsg;
+ } else {
+ msg = session.createObjectMessage((Serializable) msgBody);
+ }
+
+ msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
+ producer.send(msg);
+ // Message must be transacted before we return.
+ session.commit();
+ } catch (Exception e) {
+ // Gobble up the exception. Message delivery is best effort.
+ LOG.error("Failed to send message on topic: " + topicName + " event: "
+ + event, e);
+ }
+ }
+
+ protected void createConnection() {
+
+ Context jndiCntxt;
+ try {
+ jndiCntxt = new InitialContext();
+ ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
+ .lookup("ConnectionFactory");
+ Connection conn = connFac.createConnection();
+ conn.start();
+ conn.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException jmse) {
+ LOG.error(jmse.toString());
+ }
+ });
+ // We want message to be sent when session commits, thus we run in
+ // transacted mode.
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ } catch (NamingException e) {
+ LOG.error("JNDI error while setting up Message Bus connection. "
+ + "Please make sure file named 'jndi.properties' is in "
+ + "classpath and contains appropriate key-value pairs.", e);
+ } catch (JMSException e) {
+ LOG.error("Failed to initialize connection to message bus", e);
+ } catch (Throwable t) {
+ LOG.error("Unable to connect to JMS provider", t);
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ // Close the connection before dying.
+ try {
+ if (null != session)
+ session.close();
+ if (conn != null) {
+ conn.close();
+ }
+
+ } catch (Exception ignore) {
+ LOG.info("Failed to close message bus connection.", ignore);
+ }
+ }
+
+ @Override
+ public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
+ throws MetaException {
+ if (lpde.getStatus())
+ send(
+ lpde.getPartitionName(),
+ lpde.getTable().getParameters()
+ .get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),
+ HCatConstants.HCAT_PARTITION_DONE_EVENT);
+ }
+
+ @Override
+ public void onAlterPartition(AlterPartitionEvent ape) throws MetaException {
+ // no-op
+ }
+
+ @Override
+ public void onAlterTable(AlterTableEvent ate) throws MetaException {
+ // no-op
+ }
+}
diff --git server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java
new file mode 100644
index 0000000..0db2b16
--- /dev/null
+++ server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.listener;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.common.HCatConstants;
+
+public class TestMsgBusConnection extends TestCase {
+
+ private Driver driver;
+ private BrokerService broker;
+ private MessageConsumer consumer;
+
+ @Override
+ protected void setUp() throws Exception {
+
+ super.setUp();
+ broker = new BrokerService();
+ // configure the broker
+ broker.addConnector("tcp://localhost:61616?broker.persistent=false");
+
+ broker.start();
+
+ System.setProperty("java.naming.factory.initial",
+ "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+ System.setProperty("java.naming.provider.url", "tcp://localhost:61616");
+ connectClient();
+ HiveConf hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
+ NotificationListener.class.getName());
+ hiveConf.set("hive.metastore.local", "true");
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat");
+ SessionState.start(new CliSessionState(hiveConf));
+ driver = new Driver(hiveConf);
+ }
+
+ private void connectClient() throws JMSException {
+ ConnectionFactory connFac = new ActiveMQConnectionFactory(
+ "tcp://localhost:61616");
+ Connection conn = connFac.createConnection();
+ conn.start();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ Destination hcatTopic = session.createTopic("planetlab.hcat");
+ consumer = session.createConsumer(hcatTopic);
+ }
+
+ public void testConnection() throws Exception {
+
+ try {
+ driver.run("create database testconndb");
+ Message msg = consumer.receive();
+ assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
+ msg.getStringProperty(HCatConstants.HCAT_EVENT));
+ assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
+ assertEquals("testconndb",
+ ((Database) ((ObjectMessage) msg).getObject()).getName());
+ broker.stop();
+ driver.run("drop database testconndb cascade");
+ broker.start(true);
+ connectClient();
+ driver.run("create database testconndb");
+ msg = consumer.receive();
+ assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
+ msg.getStringProperty(HCatConstants.HCAT_EVENT));
+ assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
+ assertEquals("testconndb",
+ ((Database) ((ObjectMessage) msg).getObject()).getName());
+ driver.run("drop database testconndb cascade");
+ msg = consumer.receive();
+ assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT,
+ msg.getStringProperty(HCatConstants.HCAT_EVENT));
+ assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
+ assertEquals("testconndb",
+ ((Database) ((ObjectMessage) msg).getObject()).getName());
+ } catch (NoSuchObjectException nsoe) {
+ nsoe.printStackTrace(System.err);
+ assert false;
+ } catch (AlreadyExistsException aee) {
+ aee.printStackTrace(System.err);
+ assert false;
+ }
+ }
+}
diff --git server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java
new file mode 100644
index 0000000..9a66427
--- /dev/null
+++ server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.listener;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionEventType;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.mapreduce.HCatBaseTest;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNotificationListener extends HCatBaseTest implements MessageListener {
+
+ private List actualMessages = new ArrayList();
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty("java.naming.factory.initial",
+ "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+ System.setProperty("java.naming.provider.url",
+ "vm://localhost?broker.persistent=false");
+ ConnectionFactory connFac = new ActiveMQConnectionFactory(
+ "vm://localhost?broker.persistent=false");
+ Connection conn = connFac.createConnection();
+ conn.start();
+ // We want message to be sent when session commits, thus we run in
+ // transacted mode.
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ Destination hcatTopic = session
+ .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
+ MessageConsumer consumer1 = session.createConsumer(hcatTopic);
+ consumer1.setMessageListener(this);
+ Destination tblTopic = session
+ .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb.mytbl");
+ MessageConsumer consumer2 = session.createConsumer(tblTopic);
+ consumer2.setMessageListener(this);
+ Destination dbTopic = session
+ .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb");
+ MessageConsumer consumer3 = session.createConsumer(dbTopic);
+ consumer3.setMessageListener(this);
+
+ setUpHiveConf();
+ hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
+ NotificationListener.class.getName());
+ SessionState.start(new CliSessionState(hiveConf));
+ driver = new Driver(hiveConf);
+ client = new HiveMetaStoreClient(hiveConf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ List expectedMessages = Arrays.asList(
+ HCatConstants.HCAT_ADD_DATABASE_EVENT,
+ HCatConstants.HCAT_ADD_TABLE_EVENT,
+ HCatConstants.HCAT_ADD_PARTITION_EVENT,
+ HCatConstants.HCAT_PARTITION_DONE_EVENT,
+ HCatConstants.HCAT_DROP_PARTITION_EVENT,
+ HCatConstants.HCAT_DROP_TABLE_EVENT,
+ HCatConstants.HCAT_DROP_DATABASE_EVENT);
+ Assert.assertEquals(expectedMessages, actualMessages);
+ }
+
+ @Test
+ public void testAMQListener() throws Exception {
+ driver.run("create database mydb");
+ driver.run("use mydb");
+ driver.run("create table mytbl (a string) partitioned by (b string)");
+ driver.run("alter table mytbl add partition(b='2011')");
+ Map kvs = new HashMap(1);
+ kvs.put("b", "2011");
+ client.markPartitionForEvent("mydb", "mytbl", kvs,
+ PartitionEventType.LOAD_DONE);
+ driver.run("alter table mytbl drop partition(b='2011')");
+ driver.run("drop table mytbl");
+ driver.run("drop database mydb");
+ }
+
+ @Override
+ public void onMessage(Message msg) {
+ String event;
+ try {
+ event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
+ actualMessages.add(event);
+
+ if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) {
+
+ Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
+ .getJMSDestination().toString());
+ Assert.assertEquals("mydb",
+ ((Database) ((ObjectMessage) msg).getObject()).getName());
+ } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) {
+
+ Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString());
+ Table tbl = (Table) (((ObjectMessage) msg).getObject());
+ Assert.assertEquals("mytbl", tbl.getTableName());
+ Assert.assertEquals("mydb", tbl.getDbName());
+ Assert.assertEquals(1, tbl.getPartitionKeysSize());
+ } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) {
+
+ Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
+ .toString());
+ Partition part = (Partition) (((ObjectMessage) msg).getObject());
+ Assert.assertEquals("mytbl", part.getTableName());
+ Assert.assertEquals("mydb", part.getDbName());
+ List vals = new ArrayList(1);
+ vals.add("2011");
+ Assert.assertEquals(vals, part.getValues());
+ } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) {
+
+ Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
+ .toString());
+ Partition part = (Partition) (((ObjectMessage) msg).getObject());
+ Assert.assertEquals("mytbl", part.getTableName());
+ Assert.assertEquals("mydb", part.getDbName());
+ List vals = new ArrayList(1);
+ vals.add("2011");
+ Assert.assertEquals(vals, part.getValues());
+ } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) {
+
+ Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString());
+ Table tbl = (Table) (((ObjectMessage) msg).getObject());
+ Assert.assertEquals("mytbl", tbl.getTableName());
+ Assert.assertEquals("mydb", tbl.getDbName());
+ Assert.assertEquals(1, tbl.getPartitionKeysSize());
+ } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) {
+
+ Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
+ .getJMSDestination().toString());
+ Assert.assertEquals("mydb",
+ ((Database) ((ObjectMessage) msg).getObject()).getName());
+ } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) {
+ Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
+ .toString());
+ MapMessage mapMsg = (MapMessage) msg;
+ assert mapMsg.getString("b").equals("2011");
+ } else
+ assert false;
+ } catch (JMSException e) {
+ e.printStackTrace(System.err);
+ assert false;
+ }
+ }
+}
diff --git src/java/org/apache/hcatalog/listener/NotificationListener.java src/java/org/apache/hcatalog/listener/NotificationListener.java
deleted file mode 100644
index 5c6b8ad..0000000
--- src/java/org/apache/hcatalog/listener/NotificationListener.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hcatalog.listener;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
-import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Order;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
-import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
-import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
-import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
-import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
-import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
-import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
-import org.apache.hadoop.hive.metastore.events.DropTableEvent;
-import org.apache.hadoop.hive.metastore.events.ListenerEvent;
-import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
-import org.apache.hcatalog.common.HCatConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of
- * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends
- * message on two type of topics. One has name of form dbName.tblName On this
- * topic, two kind of messages are sent: add/drop partition and
- * finalize_partition message. Second topic has name "HCAT" and messages sent on
- * it are: add/drop database and add/drop table. All messages also has a
- * property named "HCAT_EVENT" set on them whose value can be used to configure
- * message selector on subscriber side.
- */
-public class NotificationListener extends MetaStoreEventListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
- protected Session session;
- protected Connection conn;
-
- /**
- * Create message bus connection and session in constructor.
- */
- public NotificationListener(final Configuration conf) {
-
- super(conf);
- createConnection();
- }
-
- private static String getTopicName(Partition partition,
- ListenerEvent partitionEvent) throws MetaException {
- try {
- return partitionEvent.getHandler()
- .get_table(partition.getDbName(), partition.getTableName())
- .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
- } catch (NoSuchObjectException e) {
- throw new MetaException(e.toString());
- }
- }
-
- @Override
- public void onAddPartition(AddPartitionEvent partitionEvent)
- throws MetaException {
- // Subscriber can get notification of newly add partition in a
- // particular table by listening on a topic named "dbName.tableName"
- // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION"
- if (partitionEvent.getStatus()) {
-
- Partition partition = partitionEvent.getPartition();
- String topicName = getTopicName(partition, partitionEvent);
- if (topicName != null && !topicName.equals("")) {
- send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT);
- } else {
- LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
- + partition.getDbName()
- + "."
- + partition.getTableName()
- + " To enable notifications for this table, please do alter table set properties ("
- + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
- + "=.) or whatever you want topic name to be.");
- }
- }
-
- }
-
- /**
- * Send dropped partition notifications. Subscribers can receive these notifications for a
- * particular table by listening on a topic named "dbName.tableName" with message selector
- * string {@value org.apache.hcatalog.common.HCatConstants#HCAT_EVENT} =
- * {@value org.apache.hcatalog.common.HCatConstants#HCAT_DROP_PARTITION_EVENT}.
- *
- * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been
- * found to throw NPE when serializing objects that contain null. For this reason we override
- * some fields in the StorageDescriptor of this notification. This should be fixed after
- * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved.
- */
- @Override
- public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
- if (partitionEvent.getStatus()) {
- Partition partition = partitionEvent.getPartition();
- StorageDescriptor sd = partition.getSd();
- sd.setBucketCols(new ArrayList());
- sd.setSortCols(new ArrayList());
- sd.setParameters(new HashMap());
- sd.getSerdeInfo().setParameters(new HashMap());
- sd.getSkewedInfo().setSkewedColNames(new ArrayList());
- String topicName = getTopicName(partition, partitionEvent);
- if (topicName != null && !topicName.equals("")) {
- send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT);
- } else {
- LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
- + partition.getDbName()
- + "."
- + partition.getTableName()
- + " To enable notifications for this table, please do alter table set properties ("
- + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
- + "=.) or whatever you want topic name to be.");
- }
- }
- }
-
- @Override
- public void onCreateDatabase(CreateDatabaseEvent dbEvent)
- throws MetaException {
- // Subscriber can get notification about addition of a database in HCAT
- // by listening on a topic named "HCAT" and message selector string
- // as "HCAT_EVENT = HCAT_ADD_DATABASE"
- if (dbEvent.getStatus())
- send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
- .getHiveConf()), HCatConstants.HCAT_ADD_DATABASE_EVENT);
- }
-
- @Override
- public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
- // Subscriber can get notification about drop of a database in HCAT
- // by listening on a topic named "HCAT" and message selector string
- // as "HCAT_EVENT = HCAT_DROP_DATABASE"
- if (dbEvent.getStatus())
- send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
- .getHiveConf()), HCatConstants.HCAT_DROP_DATABASE_EVENT);
- }
-
- @Override
- public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
- // Subscriber can get notification about addition of a table in HCAT
- // by listening on a topic named "HCAT" and message selector string
- // as "HCAT_EVENT = HCAT_ADD_TABLE"
- if (tableEvent.getStatus()) {
- Table tbl = tableEvent.getTable();
- HMSHandler handler = tableEvent.getHandler();
- HiveConf conf = handler.getHiveConf();
- Table newTbl;
- try {
- newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName())
- .deepCopy();
- newTbl.getParameters().put(
- HCatConstants.HCAT_MSGBUS_TOPIC_NAME,
- getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "."
- + newTbl.getTableName().toLowerCase());
- handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl);
- } catch (InvalidOperationException e) {
- MetaException me = new MetaException(e.toString());
- me.initCause(e);
- throw me;
- } catch (NoSuchObjectException e) {
- MetaException me = new MetaException(e.toString());
- me.initCause(e);
- throw me;
- }
- send(newTbl, getTopicPrefix(conf) + "."
- + newTbl.getDbName().toLowerCase(),
- HCatConstants.HCAT_ADD_TABLE_EVENT);
- }
- }
-
- private String getTopicPrefix(HiveConf conf) {
- return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
- HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
- }
-
- /**
- * Send dropped table notifications. Subscribers can receive these notifications for
- * dropped tables by listening on topic "HCAT" with message selector string
- * {@value org.apache.hcatalog.common.HCatConstants#HCAT_EVENT} =
- * {@value org.apache.hcatalog.common.HCatConstants#HCAT_DROP_TABLE_EVENT}
- *
- * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been
- * found to throw NPE when serializing objects that contain null. For this reason we override
- * some fields in the StorageDescriptor of this notification. This should be fixed after
- * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved.
- */
- @Override
- public void onDropTable(DropTableEvent tableEvent) throws MetaException {
- // Subscriber can get notification about drop of a table in HCAT
- // by listening on a topic named "HCAT" and message selector string
- // as "HCAT_EVENT = HCAT_DROP_TABLE"
-
- // Datanucleus throws NPE when we try to serialize a table object
- // retrieved from metastore. To workaround that we reset following objects
-
- if (tableEvent.getStatus()) {
- Table table = tableEvent.getTable();
- StorageDescriptor sd = table.getSd();
- sd.setBucketCols(new ArrayList());
- sd.setSortCols(new ArrayList());
- sd.setParameters(new HashMap());
- sd.getSerdeInfo().setParameters(new HashMap());
- sd.getSkewedInfo().setSkewedColNames(new ArrayList());
- send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "."
- + table.getDbName().toLowerCase(),
- HCatConstants.HCAT_DROP_TABLE_EVENT);
- }
- }
-
- /**
- * @param msgBody
- * is the metastore object. It is sent in full such that if
- * subscriber is really interested in details, it can reconstruct it
- * fully. In case of finalize_partition message this will be string
- * specification of the partition.
- * @param topicName
- * is the name on message broker on which message is sent.
- * @param event
- * is the value of HCAT_EVENT property in message. It can be used to
- * select messages in client side.
- */
- protected void send(Object msgBody, String topicName, String event) {
-
- try {
-
- Destination topic = null;
- if (null == session) {
- // this will happen, if we never able to establish a connection.
- createConnection();
- if (null == session) {
- // Still not successful, return from here.
- LOG.error("Invalid session. Failed to send message on topic: "
- + topicName + " event: " + event);
- return;
- }
- }
- try {
- // Topics are created on demand. If it doesn't exist on broker it will
- // be created when broker receives this message.
- topic = session.createTopic(topicName);
- } catch (IllegalStateException ise) {
- // this will happen if we were able to establish connection once, but
- // its no longer valid,
- // ise is thrown, catch it and retry.
- LOG.error("Seems like connection is lost. Retrying", ise);
- createConnection();
- topic = session.createTopic(topicName);
- }
- if (null == topic) {
- // Still not successful, return from here.
- LOG.error("Invalid session. Failed to send message on topic: "
- + topicName + " event: " + event);
- return;
- }
- MessageProducer producer = session.createProducer(topic);
- Message msg;
- if (msgBody instanceof Map) {
- MapMessage mapMsg = session.createMapMessage();
- Map incomingMap = (Map) msgBody;
- for (Entry partCol : incomingMap.entrySet()) {
- mapMsg.setString(partCol.getKey(), partCol.getValue());
- }
- msg = mapMsg;
- } else {
- msg = session.createObjectMessage((Serializable) msgBody);
- }
-
- msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
- producer.send(msg);
- // Message must be transacted before we return.
- session.commit();
- } catch (Exception e) {
- // Gobble up the exception. Message delivery is best effort.
- LOG.error("Failed to send message on topic: " + topicName + " event: "
- + event, e);
- }
- }
-
- protected void createConnection() {
-
- Context jndiCntxt;
- try {
- jndiCntxt = new InitialContext();
- ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
- .lookup("ConnectionFactory");
- Connection conn = connFac.createConnection();
- conn.start();
- conn.setExceptionListener(new ExceptionListener() {
- @Override
- public void onException(JMSException jmse) {
- LOG.error(jmse.toString());
- }
- });
- // We want message to be sent when session commits, thus we run in
- // transacted mode.
- session = conn.createSession(true, Session.SESSION_TRANSACTED);
- } catch (NamingException e) {
- LOG.error("JNDI error while setting up Message Bus connection. "
- + "Please make sure file named 'jndi.properties' is in "
- + "classpath and contains appropriate key-value pairs.", e);
- } catch (JMSException e) {
- LOG.error("Failed to initialize connection to message bus", e);
- } catch (Throwable t) {
- LOG.error("Unable to connect to JMS provider", t);
- }
- }
-
- @Override
- protected void finalize() throws Throwable {
- // Close the connection before dying.
- try {
- if (null != session)
- session.close();
- if (conn != null) {
- conn.close();
- }
-
- } catch (Exception ignore) {
- LOG.info("Failed to close message bus connection.", ignore);
- }
- }
-
- @Override
- public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
- throws MetaException {
- if (lpde.getStatus())
- send(
- lpde.getPartitionName(),
- lpde.getTable().getParameters()
- .get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),
- HCatConstants.HCAT_PARTITION_DONE_EVENT);
- }
-
- @Override
- public void onAlterPartition(AlterPartitionEvent ape) throws MetaException {
- // no-op
- }
-
- @Override
- public void onAlterTable(AlterTableEvent ate) throws MetaException {
- // no-op
- }
-}
diff --git src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
deleted file mode 100644
index 0db2b16..0000000
--- src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hcatalog.listener;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.hadoop.hive.cli.CliSessionState;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hcatalog.common.HCatConstants;
-
-public class TestMsgBusConnection extends TestCase {
-
- private Driver driver;
- private BrokerService broker;
- private MessageConsumer consumer;
-
- @Override
- protected void setUp() throws Exception {
-
- super.setUp();
- broker = new BrokerService();
- // configure the broker
- broker.addConnector("tcp://localhost:61616?broker.persistent=false");
-
- broker.start();
-
- System.setProperty("java.naming.factory.initial",
- "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
- System.setProperty("java.naming.provider.url", "tcp://localhost:61616");
- connectClient();
- HiveConf hiveConf = new HiveConf(this.getClass());
- hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
- NotificationListener.class.getName());
- hiveConf.set("hive.metastore.local", "true");
- hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
- hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat");
- SessionState.start(new CliSessionState(hiveConf));
- driver = new Driver(hiveConf);
- }
-
- private void connectClient() throws JMSException {
- ConnectionFactory connFac = new ActiveMQConnectionFactory(
- "tcp://localhost:61616");
- Connection conn = connFac.createConnection();
- conn.start();
- Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
- Destination hcatTopic = session.createTopic("planetlab.hcat");
- consumer = session.createConsumer(hcatTopic);
- }
-
- public void testConnection() throws Exception {
-
- try {
- driver.run("create database testconndb");
- Message msg = consumer.receive();
- assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
- msg.getStringProperty(HCatConstants.HCAT_EVENT));
- assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
- assertEquals("testconndb",
- ((Database) ((ObjectMessage) msg).getObject()).getName());
- broker.stop();
- driver.run("drop database testconndb cascade");
- broker.start(true);
- connectClient();
- driver.run("create database testconndb");
- msg = consumer.receive();
- assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
- msg.getStringProperty(HCatConstants.HCAT_EVENT));
- assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
- assertEquals("testconndb",
- ((Database) ((ObjectMessage) msg).getObject()).getName());
- driver.run("drop database testconndb cascade");
- msg = consumer.receive();
- assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT,
- msg.getStringProperty(HCatConstants.HCAT_EVENT));
- assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
- assertEquals("testconndb",
- ((Database) ((ObjectMessage) msg).getObject()).getName());
- } catch (NoSuchObjectException nsoe) {
- nsoe.printStackTrace(System.err);
- assert false;
- } catch (AlreadyExistsException aee) {
- aee.printStackTrace(System.err);
- assert false;
- }
- }
-}
diff --git src/test/org/apache/hcatalog/listener/TestNotificationListener.java src/test/org/apache/hcatalog/listener/TestNotificationListener.java
deleted file mode 100644
index 9a66427..0000000
--- src/test/org/apache/hcatalog/listener/TestNotificationListener.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hcatalog.listener;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.hadoop.hive.cli.CliSessionState;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.PartitionEventType;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.mapreduce.HCatBaseTest;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestNotificationListener extends HCatBaseTest implements MessageListener {
-
- private List actualMessages = new ArrayList();
-
- @Before
- public void setUp() throws Exception {
- System.setProperty("java.naming.factory.initial",
- "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
- System.setProperty("java.naming.provider.url",
- "vm://localhost?broker.persistent=false");
- ConnectionFactory connFac = new ActiveMQConnectionFactory(
- "vm://localhost?broker.persistent=false");
- Connection conn = connFac.createConnection();
- conn.start();
- // We want message to be sent when session commits, thus we run in
- // transacted mode.
- Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
- Destination hcatTopic = session
- .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
- MessageConsumer consumer1 = session.createConsumer(hcatTopic);
- consumer1.setMessageListener(this);
- Destination tblTopic = session
- .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb.mytbl");
- MessageConsumer consumer2 = session.createConsumer(tblTopic);
- consumer2.setMessageListener(this);
- Destination dbTopic = session
- .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb");
- MessageConsumer consumer3 = session.createConsumer(dbTopic);
- consumer3.setMessageListener(this);
-
- setUpHiveConf();
- hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
- NotificationListener.class.getName());
- SessionState.start(new CliSessionState(hiveConf));
- driver = new Driver(hiveConf);
- client = new HiveMetaStoreClient(hiveConf);
- }
-
- @After
- public void tearDown() throws Exception {
- List expectedMessages = Arrays.asList(
- HCatConstants.HCAT_ADD_DATABASE_EVENT,
- HCatConstants.HCAT_ADD_TABLE_EVENT,
- HCatConstants.HCAT_ADD_PARTITION_EVENT,
- HCatConstants.HCAT_PARTITION_DONE_EVENT,
- HCatConstants.HCAT_DROP_PARTITION_EVENT,
- HCatConstants.HCAT_DROP_TABLE_EVENT,
- HCatConstants.HCAT_DROP_DATABASE_EVENT);
- Assert.assertEquals(expectedMessages, actualMessages);
- }
-
- @Test
- public void testAMQListener() throws Exception {
- driver.run("create database mydb");
- driver.run("use mydb");
- driver.run("create table mytbl (a string) partitioned by (b string)");
- driver.run("alter table mytbl add partition(b='2011')");
- Map kvs = new HashMap(1);
- kvs.put("b", "2011");
- client.markPartitionForEvent("mydb", "mytbl", kvs,
- PartitionEventType.LOAD_DONE);
- driver.run("alter table mytbl drop partition(b='2011')");
- driver.run("drop table mytbl");
- driver.run("drop database mydb");
- }
-
- @Override
- public void onMessage(Message msg) {
- String event;
- try {
- event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
- actualMessages.add(event);
-
- if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) {
-
- Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
- .getJMSDestination().toString());
- Assert.assertEquals("mydb",
- ((Database) ((ObjectMessage) msg).getObject()).getName());
- } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) {
-
- Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString());
- Table tbl = (Table) (((ObjectMessage) msg).getObject());
- Assert.assertEquals("mytbl", tbl.getTableName());
- Assert.assertEquals("mydb", tbl.getDbName());
- Assert.assertEquals(1, tbl.getPartitionKeysSize());
- } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) {
-
- Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
- .toString());
- Partition part = (Partition) (((ObjectMessage) msg).getObject());
- Assert.assertEquals("mytbl", part.getTableName());
- Assert.assertEquals("mydb", part.getDbName());
- List vals = new ArrayList(1);
- vals.add("2011");
- Assert.assertEquals(vals, part.getValues());
- } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) {
-
- Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
- .toString());
- Partition part = (Partition) (((ObjectMessage) msg).getObject());
- Assert.assertEquals("mytbl", part.getTableName());
- Assert.assertEquals("mydb", part.getDbName());
- List vals = new ArrayList(1);
- vals.add("2011");
- Assert.assertEquals(vals, part.getValues());
- } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) {
-
- Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString());
- Table tbl = (Table) (((ObjectMessage) msg).getObject());
- Assert.assertEquals("mytbl", tbl.getTableName());
- Assert.assertEquals("mydb", tbl.getDbName());
- Assert.assertEquals(1, tbl.getPartitionKeysSize());
- } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) {
-
- Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
- .getJMSDestination().toString());
- Assert.assertEquals("mydb",
- ((Database) ((ObjectMessage) msg).getObject()).getName());
- } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) {
- Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
- .toString());
- MapMessage mapMsg = (MapMessage) msg;
- assert mapMsg.getString("b").equals("2011");
- } else
- assert false;
- } catch (JMSException e) {
- e.printStackTrace(System.err);
- assert false;
- }
- }
-}
diff --git webhcat/java-client/build.xml webhcat/java-client/build.xml
index fc1386c..eaa5347 100644
--- webhcat/java-client/build.xml
+++ webhcat/java-client/build.xml
@@ -18,6 +18,7 @@
+
diff --git webhcat/svr/build.xml webhcat/svr/build.xml
index a7bc34c..17fd93d 100644
--- webhcat/svr/build.xml
+++ webhcat/svr/build.xml
@@ -18,6 +18,7 @@
+