diff --git build.xml build.xml index 4da0cff..6ee6ace 100644 --- build.xml +++ build.xml @@ -104,6 +104,7 @@ + @@ -127,12 +128,6 @@ - - - @@ -180,25 +175,12 @@ - - - - - - - - - + + @@ -285,6 +270,7 @@ classPathRef="compile.classpath"/> + @@ -299,6 +285,7 @@ + @@ -336,6 +323,7 @@ failonerror="true"> + @@ -382,6 +370,9 @@ + + + @@ -468,6 +459,7 @@ + diff --git ivy.xml ivy.xml index 703f6ec..5b9d8b7 100644 --- ivy.xml +++ ivy.xml @@ -37,11 +37,6 @@ - - - - - diff --git server-extensions/build.xml server-extensions/build.xml new file mode 100644 index 0000000..4e3defa --- /dev/null +++ server-extensions/build.xml @@ -0,0 +1,39 @@ + + + + + + + + + + + + + + + + + + + + + + + + diff --git server-extensions/ivy.xml server-extensions/ivy.xml new file mode 100644 index 0000000..8f5d2fd --- /dev/null +++ server-extensions/ivy.xml @@ -0,0 +1,45 @@ + + + + + + + Apache HCatalog + + + + + + + + + + + + + + + + + + + + diff --git server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java new file mode 100644 index 0000000..63775f7 --- /dev/null +++ server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java @@ -0,0 +1,373 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.listener; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.apache.hadoop.hive.metastore.MetaStoreEventListener; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; +import org.apache.hcatalog.common.HCatConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of + * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends + * message on two type of topics. One has name of form dbName.tblName On this + * topic, two kind of messages are sent: add/drop partition and + * finalize_partition message. Second topic has name "HCAT" and messages sent on + * it are: add/drop database and add/drop table. All messages also has a + * property named "HCAT_EVENT" set on them whose value can be used to configure + * message selector on subscriber side. + */ +public class NotificationListener extends MetaStoreEventListener { + + private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class); + protected Session session; + protected Connection conn; + + /** + * Create message bus connection and session in constructor. + */ + public NotificationListener(final Configuration conf) { + + super(conf); + createConnection(); + } + + private static String getTopicName(Partition partition, + ListenerEvent partitionEvent) throws MetaException { + try { + return partitionEvent.getHandler() + .get_table(partition.getDbName(), partition.getTableName()) + .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME); + } catch (NoSuchObjectException e) { + throw new MetaException(e.toString()); + } + } + + @Override + public void onAddPartition(AddPartitionEvent partitionEvent) + throws MetaException { + // Subscriber can get notification of newly add partition in a + // particular table by listening on a topic named "dbName.tableName" + // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" + if (partitionEvent.getStatus()) { + + Partition partition = partitionEvent.getPartition(); + String topicName = getTopicName(partition, partitionEvent); + if (topicName != null && !topicName.equals("")) { + send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT); + } else { + LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + + partition.getDbName() + + "." + + partition.getTableName() + + " To enable notifications for this table, please do alter table set properties (" + + HCatConstants.HCAT_MSGBUS_TOPIC_NAME + + "=.) or whatever you want topic name to be."); + } + } + + } + + @Override + public void onDropPartition(DropPartitionEvent partitionEvent) + throws MetaException { + // Subscriber can get notification of dropped partition in a + // particular table by listening on a topic named "dbName.tableName" + // and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION" + + // Datanucleus throws NPE when we try to serialize a partition object + // retrieved from metastore. To workaround that we reset following objects + + if (partitionEvent.getStatus()) { + Partition partition = partitionEvent.getPartition(); + StorageDescriptor sd = partition.getSd(); + sd.setBucketCols(new ArrayList()); + sd.setSortCols(new ArrayList()); + sd.setParameters(new HashMap()); + sd.getSerdeInfo().setParameters(new HashMap()); + String topicName = getTopicName(partition, partitionEvent); + if (topicName != null && !topicName.equals("")) { + send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT); + } else { + LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + + partition.getDbName() + + "." + + partition.getTableName() + + " To enable notifications for this table, please do alter table set properties (" + + HCatConstants.HCAT_MSGBUS_TOPIC_NAME + + "=.) or whatever you want topic name to be."); + } + } + } + + @Override + public void onCreateDatabase(CreateDatabaseEvent dbEvent) + throws MetaException { + // Subscriber can get notification about addition of a database in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_ADD_DATABASE" + if (dbEvent.getStatus()) + send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler() + .getHiveConf()), HCatConstants.HCAT_ADD_DATABASE_EVENT); + } + + @Override + public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { + // Subscriber can get notification about drop of a database in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_DROP_DATABASE" + if (dbEvent.getStatus()) + send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler() + .getHiveConf()), HCatConstants.HCAT_DROP_DATABASE_EVENT); + } + + @Override + public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { + // Subscriber can get notification about addition of a table in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_ADD_TABLE" + if (tableEvent.getStatus()) { + Table tbl = tableEvent.getTable(); + HMSHandler handler = tableEvent.getHandler(); + HiveConf conf = handler.getHiveConf(); + Table newTbl; + try { + newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName()) + .deepCopy(); + newTbl.getParameters().put( + HCatConstants.HCAT_MSGBUS_TOPIC_NAME, + getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "." + + newTbl.getTableName().toLowerCase()); + handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl); + } catch (InvalidOperationException e) { + MetaException me = new MetaException(e.toString()); + me.initCause(e); + throw me; + } catch (NoSuchObjectException e) { + MetaException me = new MetaException(e.toString()); + me.initCause(e); + throw me; + } + send(newTbl, getTopicPrefix(conf) + "." + + newTbl.getDbName().toLowerCase(), + HCatConstants.HCAT_ADD_TABLE_EVENT); + } + } + + private String getTopicPrefix(HiveConf conf) { + return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); + } + + @Override + public void onDropTable(DropTableEvent tableEvent) throws MetaException { + // Subscriber can get notification about drop of a table in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_DROP_TABLE" + + // Datanucleus throws NPE when we try to serialize a table object + // retrieved from metastore. To workaround that we reset following objects + + if (tableEvent.getStatus()) { + Table table = tableEvent.getTable(); + StorageDescriptor sd = table.getSd(); + sd.setBucketCols(new ArrayList()); + sd.setSortCols(new ArrayList()); + sd.setParameters(new HashMap()); + sd.getSerdeInfo().setParameters(new HashMap()); + send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + + table.getDbName().toLowerCase(), + HCatConstants.HCAT_DROP_TABLE_EVENT); + } + } + + /** + * @param msgBody + * is the metastore object. It is sent in full such that if + * subscriber is really interested in details, it can reconstruct it + * fully. In case of finalize_partition message this will be string + * specification of the partition. + * @param topicName + * is the name on message broker on which message is sent. + * @param event + * is the value of HCAT_EVENT property in message. It can be used to + * select messages in client side. + */ + protected void send(Object msgBody, String topicName, String event) { + + try { + + Destination topic = null; + if (null == session) { + // this will happen, if we never able to establish a connection. + createConnection(); + if (null == session) { + // Still not successful, return from here. + LOG.error("Invalid session. Failed to send message on topic: " + + topicName + " event: " + event); + return; + } + } + try { + // Topics are created on demand. If it doesn't exist on broker it will + // be created when broker receives this message. + topic = session.createTopic(topicName); + } catch (IllegalStateException ise) { + // this will happen if we were able to establish connection once, but + // its no longer valid, + // ise is thrown, catch it and retry. + LOG.error("Seems like connection is lost. Retrying", ise); + createConnection(); + topic = session.createTopic(topicName); + } + if (null == topic) { + // Still not successful, return from here. + LOG.error("Invalid session. Failed to send message on topic: " + + topicName + " event: " + event); + return; + } + MessageProducer producer = session.createProducer(topic); + Message msg; + if (msgBody instanceof Map) { + MapMessage mapMsg = session.createMapMessage(); + Map incomingMap = (Map) msgBody; + for (Entry partCol : incomingMap.entrySet()) { + mapMsg.setString(partCol.getKey(), partCol.getValue()); + } + msg = mapMsg; + } else { + msg = session.createObjectMessage((Serializable) msgBody); + } + + msg.setStringProperty(HCatConstants.HCAT_EVENT, event); + producer.send(msg); + // Message must be transacted before we return. + session.commit(); + } catch (Exception e) { + // Gobble up the exception. Message delivery is best effort. + LOG.error("Failed to send message on topic: " + topicName + " event: " + + event, e); + } + } + + protected void createConnection() { + + Context jndiCntxt; + try { + jndiCntxt = new InitialContext(); + ConnectionFactory connFac = (ConnectionFactory) jndiCntxt + .lookup("ConnectionFactory"); + Connection conn = connFac.createConnection(); + conn.start(); + conn.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException jmse) { + LOG.error(jmse.toString()); + } + }); + // We want message to be sent when session commits, thus we run in + // transacted mode. + session = conn.createSession(true, Session.SESSION_TRANSACTED); + } catch (NamingException e) { + LOG.error("JNDI error while setting up Message Bus connection. " + + "Please make sure file named 'jndi.properties' is in " + + "classpath and contains appropriate key-value pairs.", e); + } catch (JMSException e) { + LOG.error("Failed to initialize connection to message bus", e); + } catch (Throwable t) { + LOG.error("Unable to connect to JMS provider", t); + } + } + + @Override + protected void finalize() throws Throwable { + // Close the connection before dying. + try { + if (null != session) + session.close(); + if (conn != null) { + conn.close(); + } + + } catch (Exception ignore) { + LOG.info("Failed to close message bus connection.", ignore); + } + } + + @Override + public void onLoadPartitionDone(LoadPartitionDoneEvent lpde) + throws MetaException { + if (lpde.getStatus()) + send( + lpde.getPartitionName(), + lpde.getTable().getParameters() + .get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME), + HCatConstants.HCAT_PARTITION_DONE_EVENT); + } + + @Override + public void onAlterPartition(AlterPartitionEvent ape) throws MetaException { + // no-op + } + + @Override + public void onAlterTable(AlterTableEvent ate) throws MetaException { + // no-op + } +} \ No newline at end of file diff --git server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java new file mode 100644 index 0000000..4da3cef --- /dev/null +++ server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.listener; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hcatalog.common.HCatConstants; + +public class TestMsgBusConnection extends TestCase { + + private Driver driver; + private BrokerService broker; + private MessageConsumer consumer; + + @Override + protected void setUp() throws Exception { + + super.setUp(); + broker = new BrokerService(); + // configure the broker + broker.addConnector("tcp://localhost:61616?broker.persistent=false"); + + broker.start(); + + System.setProperty("java.naming.factory.initial", + "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.provider.url", "tcp://localhost:61616"); + connectClient(); + HiveConf hiveConf = new HiveConf(this.getClass()); + hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, + NotificationListener.class.getName()); + hiveConf.set("hive.metastore.local", "true"); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat"); + SessionState.start(new CliSessionState(hiveConf)); + driver = new Driver(hiveConf); + } + + private void connectClient() throws JMSException { + ConnectionFactory connFac = new ActiveMQConnectionFactory( + "tcp://localhost:61616"); + Connection conn = connFac.createConnection(); + conn.start(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Destination hcatTopic = session.createTopic("planetlab.hcat"); + consumer = session.createConsumer(hcatTopic); + } + + public void testConnection() throws Exception { + + try { + driver.run("create database testconndb"); + Message msg = consumer.receive(); + assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, + msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); + assertEquals("testconndb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + broker.stop(); + driver.run("drop database testconndb cascade"); + broker.start(true); + connectClient(); + driver.run("create database testconndb"); + msg = consumer.receive(); + assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, + msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); + assertEquals("testconndb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + driver.run("drop database testconndb cascade"); + msg = consumer.receive(); + assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, + msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); + assertEquals("testconndb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + } catch (NoSuchObjectException nsoe) { + nsoe.printStackTrace(System.err); + assert false; + } catch (AlreadyExistsException aee) { + aee.printStackTrace(System.err); + assert false; + } + } +} diff --git server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java new file mode 100644 index 0000000..7d09d79 --- /dev/null +++ server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.listener; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionEventType; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; +import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.thrift.TException; + +import junit.framework.TestCase; + +public class TestNotificationListener extends TestCase implements + MessageListener { + + private HiveConf hiveConf; + private Driver driver; + private AtomicInteger cntInvocation = new AtomicInteger(0); + + @Override + protected void setUp() throws Exception { + + super.setUp(); + System.setProperty("java.naming.factory.initial", + "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.provider.url", + "vm://localhost?broker.persistent=false"); + ConnectionFactory connFac = new ActiveMQConnectionFactory( + "vm://localhost?broker.persistent=false"); + Connection conn = connFac.createConnection(); + conn.start(); + // We want message to be sent when session commits, thus we run in + // transacted mode. + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Destination hcatTopic = session + .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); + MessageConsumer consumer1 = session.createConsumer(hcatTopic); + consumer1.setMessageListener(this); + Destination tblTopic = session + .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb.mytbl"); + MessageConsumer consumer2 = session.createConsumer(tblTopic); + consumer2.setMessageListener(this); + Destination dbTopic = session + .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb"); + MessageConsumer consumer3 = session.createConsumer(dbTopic); + consumer3.setMessageListener(this); + hiveConf = new HiveConf(this.getClass()); + hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, + NotificationListener.class.getName()); + hiveConf.set("hive.metastore.local", "true"); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + SessionState.start(new CliSessionState(hiveConf)); + driver = new Driver(hiveConf); + } + + @Override + protected void tearDown() throws Exception { + assertEquals(7, cntInvocation.get()); + super.tearDown(); + } + + public void testAMQListener() throws MetaException, TException, + UnknownTableException, NoSuchObjectException, CommandNeedRetryException, + UnknownDBException, InvalidPartitionException, UnknownPartitionException { + driver.run("create database mydb"); + driver.run("use mydb"); + driver.run("create table mytbl (a string) partitioned by (b string)"); + driver.run("alter table mytbl add partition(b='2011')"); + HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf); + Map kvs = new HashMap(1); + kvs.put("b", "2011"); + msc.markPartitionForEvent("mydb", "mytbl", kvs, + PartitionEventType.LOAD_DONE); + driver.run("alter table mytbl drop partition(b='2011')"); + driver.run("drop table mytbl"); + driver.run("drop database mydb"); + } + + @Override + public void onMessage(Message msg) { + cntInvocation.incrementAndGet(); + + String event; + try { + event = msg.getStringProperty(HCatConstants.HCAT_EVENT); + if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) { + + assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg + .getJMSDestination().toString()); + assertEquals("mydb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) { + + assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); + Table tbl = (Table) (((ObjectMessage) msg).getObject()); + assertEquals("mytbl", tbl.getTableName()); + assertEquals("mydb", tbl.getDbName()); + assertEquals(1, tbl.getPartitionKeysSize()); + } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) { + + assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + .toString()); + Partition part = (Partition) (((ObjectMessage) msg).getObject()); + assertEquals("mytbl", part.getTableName()); + assertEquals("mydb", part.getDbName()); + List vals = new ArrayList(1); + vals.add("2011"); + assertEquals(vals, part.getValues()); + } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { + + assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + .toString()); + Partition part = (Partition) (((ObjectMessage) msg).getObject()); + assertEquals("mytbl", part.getTableName()); + assertEquals("mydb", part.getDbName()); + List vals = new ArrayList(1); + vals.add("2011"); + assertEquals(vals, part.getValues()); + } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) { + + assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); + Table tbl = (Table) (((ObjectMessage) msg).getObject()); + assertEquals("mytbl", tbl.getTableName()); + assertEquals("mydb", tbl.getDbName()); + assertEquals(1, tbl.getPartitionKeysSize()); + } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) { + + assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg + .getJMSDestination().toString()); + assertEquals("mydb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) { + assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + .toString()); + MapMessage mapMsg = (MapMessage) msg; + assert mapMsg.getString("b").equals("2011"); + } else + assert false; + } catch (JMSException e) { + e.printStackTrace(System.err); + assert false; + } + } +} diff --git src/java/org/apache/hcatalog/listener/NotificationListener.java src/java/org/apache/hcatalog/listener/NotificationListener.java deleted file mode 100644 index 63775f7..0000000 --- src/java/org/apache/hcatalog/listener/NotificationListener.java +++ /dev/null @@ -1,373 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hcatalog.listener; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; -import org.apache.hadoop.hive.metastore.MetaStoreEventListener; -import org.apache.hadoop.hive.metastore.api.InvalidOperationException; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; -import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; -import org.apache.hadoop.hive.metastore.events.AlterTableEvent; -import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; -import org.apache.hadoop.hive.metastore.events.CreateTableEvent; -import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; -import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; -import org.apache.hadoop.hive.metastore.events.DropTableEvent; -import org.apache.hadoop.hive.metastore.events.ListenerEvent; -import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; -import org.apache.hcatalog.common.HCatConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implementation of - * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends - * message on two type of topics. One has name of form dbName.tblName On this - * topic, two kind of messages are sent: add/drop partition and - * finalize_partition message. Second topic has name "HCAT" and messages sent on - * it are: add/drop database and add/drop table. All messages also has a - * property named "HCAT_EVENT" set on them whose value can be used to configure - * message selector on subscriber side. - */ -public class NotificationListener extends MetaStoreEventListener { - - private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class); - protected Session session; - protected Connection conn; - - /** - * Create message bus connection and session in constructor. - */ - public NotificationListener(final Configuration conf) { - - super(conf); - createConnection(); - } - - private static String getTopicName(Partition partition, - ListenerEvent partitionEvent) throws MetaException { - try { - return partitionEvent.getHandler() - .get_table(partition.getDbName(), partition.getTableName()) - .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME); - } catch (NoSuchObjectException e) { - throw new MetaException(e.toString()); - } - } - - @Override - public void onAddPartition(AddPartitionEvent partitionEvent) - throws MetaException { - // Subscriber can get notification of newly add partition in a - // particular table by listening on a topic named "dbName.tableName" - // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" - if (partitionEvent.getStatus()) { - - Partition partition = partitionEvent.getPartition(); - String topicName = getTopicName(partition, partitionEvent); - if (topicName != null && !topicName.equals("")) { - send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT); - } else { - LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " - + partition.getDbName() - + "." - + partition.getTableName() - + " To enable notifications for this table, please do alter table set properties (" - + HCatConstants.HCAT_MSGBUS_TOPIC_NAME - + "=.) or whatever you want topic name to be."); - } - } - - } - - @Override - public void onDropPartition(DropPartitionEvent partitionEvent) - throws MetaException { - // Subscriber can get notification of dropped partition in a - // particular table by listening on a topic named "dbName.tableName" - // and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION" - - // Datanucleus throws NPE when we try to serialize a partition object - // retrieved from metastore. To workaround that we reset following objects - - if (partitionEvent.getStatus()) { - Partition partition = partitionEvent.getPartition(); - StorageDescriptor sd = partition.getSd(); - sd.setBucketCols(new ArrayList()); - sd.setSortCols(new ArrayList()); - sd.setParameters(new HashMap()); - sd.getSerdeInfo().setParameters(new HashMap()); - String topicName = getTopicName(partition, partitionEvent); - if (topicName != null && !topicName.equals("")) { - send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT); - } else { - LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " - + partition.getDbName() - + "." - + partition.getTableName() - + " To enable notifications for this table, please do alter table set properties (" - + HCatConstants.HCAT_MSGBUS_TOPIC_NAME - + "=.) or whatever you want topic name to be."); - } - } - } - - @Override - public void onCreateDatabase(CreateDatabaseEvent dbEvent) - throws MetaException { - // Subscriber can get notification about addition of a database in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_ADD_DATABASE" - if (dbEvent.getStatus()) - send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler() - .getHiveConf()), HCatConstants.HCAT_ADD_DATABASE_EVENT); - } - - @Override - public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { - // Subscriber can get notification about drop of a database in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_DROP_DATABASE" - if (dbEvent.getStatus()) - send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler() - .getHiveConf()), HCatConstants.HCAT_DROP_DATABASE_EVENT); - } - - @Override - public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { - // Subscriber can get notification about addition of a table in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_ADD_TABLE" - if (tableEvent.getStatus()) { - Table tbl = tableEvent.getTable(); - HMSHandler handler = tableEvent.getHandler(); - HiveConf conf = handler.getHiveConf(); - Table newTbl; - try { - newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName()) - .deepCopy(); - newTbl.getParameters().put( - HCatConstants.HCAT_MSGBUS_TOPIC_NAME, - getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "." - + newTbl.getTableName().toLowerCase()); - handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl); - } catch (InvalidOperationException e) { - MetaException me = new MetaException(e.toString()); - me.initCause(e); - throw me; - } catch (NoSuchObjectException e) { - MetaException me = new MetaException(e.toString()); - me.initCause(e); - throw me; - } - send(newTbl, getTopicPrefix(conf) + "." - + newTbl.getDbName().toLowerCase(), - HCatConstants.HCAT_ADD_TABLE_EVENT); - } - } - - private String getTopicPrefix(HiveConf conf) { - return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, - HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); - } - - @Override - public void onDropTable(DropTableEvent tableEvent) throws MetaException { - // Subscriber can get notification about drop of a table in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_DROP_TABLE" - - // Datanucleus throws NPE when we try to serialize a table object - // retrieved from metastore. To workaround that we reset following objects - - if (tableEvent.getStatus()) { - Table table = tableEvent.getTable(); - StorageDescriptor sd = table.getSd(); - sd.setBucketCols(new ArrayList()); - sd.setSortCols(new ArrayList()); - sd.setParameters(new HashMap()); - sd.getSerdeInfo().setParameters(new HashMap()); - send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." - + table.getDbName().toLowerCase(), - HCatConstants.HCAT_DROP_TABLE_EVENT); - } - } - - /** - * @param msgBody - * is the metastore object. It is sent in full such that if - * subscriber is really interested in details, it can reconstruct it - * fully. In case of finalize_partition message this will be string - * specification of the partition. - * @param topicName - * is the name on message broker on which message is sent. - * @param event - * is the value of HCAT_EVENT property in message. It can be used to - * select messages in client side. - */ - protected void send(Object msgBody, String topicName, String event) { - - try { - - Destination topic = null; - if (null == session) { - // this will happen, if we never able to establish a connection. - createConnection(); - if (null == session) { - // Still not successful, return from here. - LOG.error("Invalid session. Failed to send message on topic: " - + topicName + " event: " + event); - return; - } - } - try { - // Topics are created on demand. If it doesn't exist on broker it will - // be created when broker receives this message. - topic = session.createTopic(topicName); - } catch (IllegalStateException ise) { - // this will happen if we were able to establish connection once, but - // its no longer valid, - // ise is thrown, catch it and retry. - LOG.error("Seems like connection is lost. Retrying", ise); - createConnection(); - topic = session.createTopic(topicName); - } - if (null == topic) { - // Still not successful, return from here. - LOG.error("Invalid session. Failed to send message on topic: " - + topicName + " event: " + event); - return; - } - MessageProducer producer = session.createProducer(topic); - Message msg; - if (msgBody instanceof Map) { - MapMessage mapMsg = session.createMapMessage(); - Map incomingMap = (Map) msgBody; - for (Entry partCol : incomingMap.entrySet()) { - mapMsg.setString(partCol.getKey(), partCol.getValue()); - } - msg = mapMsg; - } else { - msg = session.createObjectMessage((Serializable) msgBody); - } - - msg.setStringProperty(HCatConstants.HCAT_EVENT, event); - producer.send(msg); - // Message must be transacted before we return. - session.commit(); - } catch (Exception e) { - // Gobble up the exception. Message delivery is best effort. - LOG.error("Failed to send message on topic: " + topicName + " event: " - + event, e); - } - } - - protected void createConnection() { - - Context jndiCntxt; - try { - jndiCntxt = new InitialContext(); - ConnectionFactory connFac = (ConnectionFactory) jndiCntxt - .lookup("ConnectionFactory"); - Connection conn = connFac.createConnection(); - conn.start(); - conn.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException jmse) { - LOG.error(jmse.toString()); - } - }); - // We want message to be sent when session commits, thus we run in - // transacted mode. - session = conn.createSession(true, Session.SESSION_TRANSACTED); - } catch (NamingException e) { - LOG.error("JNDI error while setting up Message Bus connection. " - + "Please make sure file named 'jndi.properties' is in " - + "classpath and contains appropriate key-value pairs.", e); - } catch (JMSException e) { - LOG.error("Failed to initialize connection to message bus", e); - } catch (Throwable t) { - LOG.error("Unable to connect to JMS provider", t); - } - } - - @Override - protected void finalize() throws Throwable { - // Close the connection before dying. - try { - if (null != session) - session.close(); - if (conn != null) { - conn.close(); - } - - } catch (Exception ignore) { - LOG.info("Failed to close message bus connection.", ignore); - } - } - - @Override - public void onLoadPartitionDone(LoadPartitionDoneEvent lpde) - throws MetaException { - if (lpde.getStatus()) - send( - lpde.getPartitionName(), - lpde.getTable().getParameters() - .get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME), - HCatConstants.HCAT_PARTITION_DONE_EVENT); - } - - @Override - public void onAlterPartition(AlterPartitionEvent ape) throws MetaException { - // no-op - } - - @Override - public void onAlterTable(AlterTableEvent ate) throws MetaException { - // no-op - } -} \ No newline at end of file diff --git src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java deleted file mode 100644 index 4da3cef..0000000 --- src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hcatalog.listener; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.ObjectMessage; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hcatalog.common.HCatConstants; - -public class TestMsgBusConnection extends TestCase { - - private Driver driver; - private BrokerService broker; - private MessageConsumer consumer; - - @Override - protected void setUp() throws Exception { - - super.setUp(); - broker = new BrokerService(); - // configure the broker - broker.addConnector("tcp://localhost:61616?broker.persistent=false"); - - broker.start(); - - System.setProperty("java.naming.factory.initial", - "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); - System.setProperty("java.naming.provider.url", "tcp://localhost:61616"); - connectClient(); - HiveConf hiveConf = new HiveConf(this.getClass()); - hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, - NotificationListener.class.getName()); - hiveConf.set("hive.metastore.local", "true"); - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat"); - SessionState.start(new CliSessionState(hiveConf)); - driver = new Driver(hiveConf); - } - - private void connectClient() throws JMSException { - ConnectionFactory connFac = new ActiveMQConnectionFactory( - "tcp://localhost:61616"); - Connection conn = connFac.createConnection(); - conn.start(); - Session session = conn.createSession(true, Session.SESSION_TRANSACTED); - Destination hcatTopic = session.createTopic("planetlab.hcat"); - consumer = session.createConsumer(hcatTopic); - } - - public void testConnection() throws Exception { - - try { - driver.run("create database testconndb"); - Message msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, - msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); - assertEquals("testconndb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); - broker.stop(); - driver.run("drop database testconndb cascade"); - broker.start(true); - connectClient(); - driver.run("create database testconndb"); - msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, - msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); - assertEquals("testconndb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); - driver.run("drop database testconndb cascade"); - msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, - msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); - assertEquals("testconndb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); - } catch (NoSuchObjectException nsoe) { - nsoe.printStackTrace(System.err); - assert false; - } catch (AlreadyExistsException aee) { - aee.printStackTrace(System.err); - assert false; - } - } -} diff --git src/test/org/apache/hcatalog/listener/TestNotificationListener.java src/test/org/apache/hcatalog/listener/TestNotificationListener.java deleted file mode 100644 index 7d09d79..0000000 --- src/test/org/apache/hcatalog/listener/TestNotificationListener.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hcatalog.listener; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.ObjectMessage; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.PartitionEventType; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.UnknownDBException; -import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; -import org.apache.hadoop.hive.metastore.api.UnknownTableException; -import org.apache.hadoop.hive.ql.CommandNeedRetryException; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hcatalog.common.HCatConstants; -import org.apache.thrift.TException; - -import junit.framework.TestCase; - -public class TestNotificationListener extends TestCase implements - MessageListener { - - private HiveConf hiveConf; - private Driver driver; - private AtomicInteger cntInvocation = new AtomicInteger(0); - - @Override - protected void setUp() throws Exception { - - super.setUp(); - System.setProperty("java.naming.factory.initial", - "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); - System.setProperty("java.naming.provider.url", - "vm://localhost?broker.persistent=false"); - ConnectionFactory connFac = new ActiveMQConnectionFactory( - "vm://localhost?broker.persistent=false"); - Connection conn = connFac.createConnection(); - conn.start(); - // We want message to be sent when session commits, thus we run in - // transacted mode. - Session session = conn.createSession(true, Session.SESSION_TRANSACTED); - Destination hcatTopic = session - .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); - MessageConsumer consumer1 = session.createConsumer(hcatTopic); - consumer1.setMessageListener(this); - Destination tblTopic = session - .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb.mytbl"); - MessageConsumer consumer2 = session.createConsumer(tblTopic); - consumer2.setMessageListener(this); - Destination dbTopic = session - .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb"); - MessageConsumer consumer3 = session.createConsumer(dbTopic); - consumer3.setMessageListener(this); - hiveConf = new HiveConf(this.getClass()); - hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, - NotificationListener.class.getName()); - hiveConf.set("hive.metastore.local", "true"); - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - SessionState.start(new CliSessionState(hiveConf)); - driver = new Driver(hiveConf); - } - - @Override - protected void tearDown() throws Exception { - assertEquals(7, cntInvocation.get()); - super.tearDown(); - } - - public void testAMQListener() throws MetaException, TException, - UnknownTableException, NoSuchObjectException, CommandNeedRetryException, - UnknownDBException, InvalidPartitionException, UnknownPartitionException { - driver.run("create database mydb"); - driver.run("use mydb"); - driver.run("create table mytbl (a string) partitioned by (b string)"); - driver.run("alter table mytbl add partition(b='2011')"); - HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf); - Map kvs = new HashMap(1); - kvs.put("b", "2011"); - msc.markPartitionForEvent("mydb", "mytbl", kvs, - PartitionEventType.LOAD_DONE); - driver.run("alter table mytbl drop partition(b='2011')"); - driver.run("drop table mytbl"); - driver.run("drop database mydb"); - } - - @Override - public void onMessage(Message msg) { - cntInvocation.incrementAndGet(); - - String event; - try { - event = msg.getStringProperty(HCatConstants.HCAT_EVENT); - if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) { - - assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg - .getJMSDestination().toString()); - assertEquals("mydb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); - } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) { - - assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); - Table tbl = (Table) (((ObjectMessage) msg).getObject()); - assertEquals("mytbl", tbl.getTableName()); - assertEquals("mydb", tbl.getDbName()); - assertEquals(1, tbl.getPartitionKeysSize()); - } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) { - - assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() - .toString()); - Partition part = (Partition) (((ObjectMessage) msg).getObject()); - assertEquals("mytbl", part.getTableName()); - assertEquals("mydb", part.getDbName()); - List vals = new ArrayList(1); - vals.add("2011"); - assertEquals(vals, part.getValues()); - } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { - - assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() - .toString()); - Partition part = (Partition) (((ObjectMessage) msg).getObject()); - assertEquals("mytbl", part.getTableName()); - assertEquals("mydb", part.getDbName()); - List vals = new ArrayList(1); - vals.add("2011"); - assertEquals(vals, part.getValues()); - } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) { - - assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); - Table tbl = (Table) (((ObjectMessage) msg).getObject()); - assertEquals("mytbl", tbl.getTableName()); - assertEquals("mydb", tbl.getDbName()); - assertEquals(1, tbl.getPartitionKeysSize()); - } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) { - - assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg - .getJMSDestination().toString()); - assertEquals("mydb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); - } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) { - assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() - .toString()); - MapMessage mapMsg = (MapMessage) msg; - assert mapMsg.getString("b").equals("2011"); - } else - assert false; - } catch (JMSException e) { - e.printStackTrace(System.err); - assert false; - } - } -}