Index: src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java =================================================================== --- src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java (revision 1430712) +++ src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java (working copy) @@ -24,8 +24,8 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; -import javax.jms.ObjectMessage; import javax.jms.Session; +import javax.jms.TextMessage; import junit.framework.TestCase; @@ -35,11 +35,12 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.messaging.HCatEventMessage; +import org.apache.hcatalog.messaging.jms.MessagingUtils; public class TestMsgBusConnection extends TestCase{ @@ -85,23 +86,25 @@ try{ driver.run("create database testconndb"); Message msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertTrue("Expected TextMessage.", msg instanceof TextMessage); + HCatEventMessage hCatEventMessage = MessagingUtils.getMessage(msg); + assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString()); - assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName()); + assertEquals("testconndb", hCatEventMessage.getDB()); broker.stop(); driver.run("drop database testconndb cascade"); broker.start(true); connectClient(); driver.run("create database testconndb"); msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString()); - assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName()); + assertEquals("testconndb", hCatEventMessage.getDB()); driver.run("drop database testconndb cascade"); msg = consumer.receive(); assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString()); - assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName()); + assertEquals("testconndb", hCatEventMessage.getDB()); } catch (NoSuchObjectException nsoe){ nsoe.printStackTrace(System.err); assert false; Index: src/test/org/apache/hcatalog/listener/TestNotificationListener.java =================================================================== --- src/test/org/apache/hcatalog/listener/TestNotificationListener.java (revision 1430712) +++ src/test/org/apache/hcatalog/listener/TestNotificationListener.java (working copy) @@ -19,165 +19,200 @@ package org.apache.hcatalog.listener; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; -import javax.jms.ObjectMessage; import javax.jms.Session; +import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.UnknownDBException; -import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; -import org.apache.hadoop.hive.metastore.api.UnknownTableException; -import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hcatalog.common.HCatConstants; -import org.apache.thrift.TException; +import org.apache.hcatalog.mapreduce.HCatBaseTest; -import junit.framework.TestCase; +import org.apache.hcatalog.messaging.AddPartitionMessage; +import org.apache.hcatalog.messaging.CreateDatabaseMessage; +import org.apache.hcatalog.messaging.CreateTableMessage; +import org.apache.hcatalog.messaging.DropDatabaseMessage; +import org.apache.hcatalog.messaging.DropPartitionMessage; +import org.apache.hcatalog.messaging.DropTableMessage; +import org.apache.hcatalog.messaging.HCatEventMessage; +import org.apache.hcatalog.messaging.MessageDeserializer; +import org.apache.hcatalog.messaging.MessageFactory; +import org.apache.hcatalog.messaging.jms.MessagingUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; -public class TestNotificationListener extends TestCase implements MessageListener{ +public class TestNotificationListener extends HCatBaseTest implements MessageListener { - private HiveConf hiveConf; - private Driver driver; - private AtomicInteger cntInvocation = new AtomicInteger(0); + private List actualMessages = new ArrayList(); - @Override - protected void setUp() throws Exception { + @Before + public void setUp() throws Exception { + System.setProperty("java.naming.factory.initial", + "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.provider.url", + "vm://localhost?broker.persistent=false"); + ConnectionFactory connFac = new ActiveMQConnectionFactory( + "vm://localhost?broker.persistent=false"); + Connection conn = connFac.createConnection(); + conn.start(); + // We want message to be sent when session commits, thus we run in + // transacted mode. + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Destination hcatTopic = session + .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); + MessageConsumer consumer1 = session.createConsumer(hcatTopic); + consumer1.setMessageListener(this); + Destination tblTopic = session + .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb.mytbl"); + MessageConsumer consumer2 = session.createConsumer(tblTopic); + consumer2.setMessageListener(this); + Destination dbTopic = session + .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb"); + MessageConsumer consumer3 = session.createConsumer(dbTopic); + consumer3.setMessageListener(this); - super.setUp(); - System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); - System.setProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false"); - ConnectionFactory connFac = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); - Connection conn = connFac.createConnection(); - conn.start(); - // We want message to be sent when session commits, thus we run in - // transacted mode. - Session session = conn.createSession(true, Session.SESSION_TRANSACTED); - Destination hcatTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); - MessageConsumer consumer1 = session.createConsumer(hcatTopic); - consumer1.setMessageListener(this); - Destination tblTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb.mytbl"); - MessageConsumer consumer2 = session.createConsumer(tblTopic); - consumer2.setMessageListener(this); - Destination dbTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb"); - MessageConsumer consumer3 = session.createConsumer(dbTopic); - consumer3.setMessageListener(this); - hiveConf = new HiveConf(this.getClass()); - hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName()); - hiveConf.set("hive.metastore.local", "true"); - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - SessionState.start(new CliSessionState(hiveConf)); - driver = new Driver(hiveConf); - } + setUpHiveConf(); + hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, + NotificationListener.class.getName()); + SessionState.start(new CliSessionState(hiveConf)); + driver = new Driver(hiveConf); + client = new HiveMetaStoreClient(hiveConf); + } - @Override - protected void tearDown() throws Exception { - assertEquals(7, cntInvocation.get()); - super.tearDown(); - } + @After + public void tearDown() throws Exception { + List expectedMessages = Arrays.asList( + HCatConstants.HCAT_CREATE_DATABASE_EVENT, + HCatConstants.HCAT_CREATE_TABLE_EVENT, + HCatConstants.HCAT_ADD_PARTITION_EVENT, + HCatConstants.HCAT_DROP_PARTITION_EVENT, + HCatConstants.HCAT_DROP_TABLE_EVENT, + HCatConstants.HCAT_DROP_DATABASE_EVENT); + Assert.assertEquals(expectedMessages, actualMessages); + } - public void testAMQListener() throws MetaException, TException, UnknownTableException, NoSuchObjectException, - CommandNeedRetryException, UnknownDBException, InvalidPartitionException, UnknownPartitionException{ - driver.run("create database mydb"); - driver.run("use mydb"); - driver.run("create table mytbl (a string) partitioned by (b string)"); - driver.run("alter table mytbl add partition(b='2011')"); - HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf); - Map kvs = new HashMap(1); - kvs.put("b", "2011"); - msc.markPartitionForEvent("mydb", "mytbl", kvs, PartitionEventType.LOAD_DONE); - driver.run("alter table mytbl drop partition(b='2011')"); - driver.run("drop table mytbl"); - driver.run("drop database mydb"); - } + @Test + public void testAMQListener() throws Exception { + driver.run("create database mydb"); + driver.run("use mydb"); + driver.run("create table mytbl (a string) partitioned by (b string)"); + driver.run("alter table mytbl add partition(b='2011')"); + Map kvs = new HashMap(1); + kvs.put("b", "2011"); + client.markPartitionForEvent("mydb", "mytbl", kvs, + PartitionEventType.LOAD_DONE); + driver.run("alter table mytbl drop partition(b='2011')"); + driver.run("drop table mytbl"); + driver.run("drop database mydb"); + } - @Override - public void onMessage(Message msg) { - cntInvocation.incrementAndGet(); + @Override + public void onMessage(Message msg) { + String event; + try { + event = msg.getStringProperty(HCatConstants.HCAT_EVENT); + String format = msg.getStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT); + String version = msg.getStringProperty(HCatConstants.HCAT_MESSAGE_VERSION); + String messageBody = ((TextMessage)msg).getText(); + actualMessages.add(event); + MessageDeserializer deserializer = MessageFactory.getDeserializer(format, version); - String event; - try { - event = msg.getStringProperty(HCatConstants.HCAT_EVENT); - if(event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)){ + if (event.equals(HCatConstants.HCAT_CREATE_DATABASE_EVENT)) { - assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString()); - assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName()); - } - else if(event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)){ + Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg + .getJMSDestination().toString()); + CreateDatabaseMessage message = deserializer.getCreateDatabaseMessage(messageBody); + Assert.assertEquals("mydb", message.getDB()); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof CreateDatabaseMessage); + Assert.assertEquals("mydb", message2.getDB()); + } else if (event.equals(HCatConstants.HCAT_CREATE_TABLE_EVENT)) { - assertEquals("topic://hcat.mydb",msg.getJMSDestination().toString()); - Table tbl = (Table)(((ObjectMessage)msg).getObject()); - assertEquals("mytbl", tbl.getTableName()); - assertEquals("mydb", tbl.getDbName()); - assertEquals(1, tbl.getPartitionKeysSize()); - } - else if(event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)){ + Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); + CreateTableMessage message = deserializer.getCreateTableMessage(messageBody); + Assert.assertEquals("mytbl", message.getTable()); + Assert.assertEquals("mydb", message.getDB()); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof CreateTableMessage); + Assert.assertEquals("mydb", message2.getDB()); + Assert.assertEquals("mytbl", ((CreateTableMessage) message2).getTable()); + } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) { - assertEquals("topic://hcat.mydb.mytbl",msg.getJMSDestination().toString()); - Partition part = (Partition)(((ObjectMessage)msg).getObject()); - assertEquals("mytbl", part.getTableName()); - assertEquals("mydb", part.getDbName()); - List vals = new ArrayList(1); - vals.add("2011"); - assertEquals(vals,part.getValues()); - } - else if(event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)){ + Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + .toString()); + AddPartitionMessage message = deserializer.getAddPartitionMessage(messageBody); + Assert.assertEquals("mytbl", message.getTable()); + Assert.assertEquals("mydb", message.getDB()); + Assert.assertEquals(1, message.getPartitions().size()); + Assert.assertEquals("2011", message.getPartitions().get(0).get("b")); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof AddPartitionMessage); + Assert.assertEquals("mydb", message2.getDB()); + Assert.assertEquals("mytbl", ((AddPartitionMessage) message2).getTable()); + Assert.assertEquals(1, ((AddPartitionMessage) message2).getPartitions().size()); + Assert.assertEquals("2011", ((AddPartitionMessage) message2).getPartitions().get(0).get("b")); + } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { - assertEquals("topic://hcat.mydb.mytbl",msg.getJMSDestination().toString()); - Partition part = (Partition)(((ObjectMessage)msg).getObject()); - assertEquals("mytbl", part.getTableName()); - assertEquals("mydb", part.getDbName()); - List vals = new ArrayList(1); - vals.add("2011"); - assertEquals(vals,part.getValues()); - } - else if(event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)){ + Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + .toString()); + DropPartitionMessage message = deserializer.getDropPartitionMessage(messageBody); + Assert.assertEquals("mytbl", message.getTable()); + Assert.assertEquals("mydb", message.getDB()); + Assert.assertEquals(1, message.getPartitions().size()); + Assert.assertEquals("2011", message.getPartitions().get(0).get("b")); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof DropPartitionMessage); + Assert.assertEquals("mydb", message2.getDB()); + Assert.assertEquals("mytbl", ((DropPartitionMessage) message2).getTable()); + Assert.assertEquals(1, ((DropPartitionMessage) message2).getPartitions().size()); + Assert.assertEquals("2011", ((DropPartitionMessage) message2).getPartitions().get(0).get("b")); + } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) { - assertEquals("topic://hcat.mydb",msg.getJMSDestination().toString()); - Table tbl = (Table)(((ObjectMessage)msg).getObject()); - assertEquals("mytbl", tbl.getTableName()); - assertEquals("mydb", tbl.getDbName()); - assertEquals(1, tbl.getPartitionKeysSize()); - } - else if(event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)){ + Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); + DropTableMessage message = deserializer.getDropTableMessage(messageBody); + Assert.assertEquals("mytbl", message.getTable()); + Assert.assertEquals("mydb", message.getDB()); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof DropTableMessage); + Assert.assertEquals("mydb", message2.getDB()); + Assert.assertEquals("mytbl", ((DropTableMessage) message2).getTable()); + } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) { - assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString()); - assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName()); - } - else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) { - assertEquals("topic://hcat.mydb.mytbl",msg.getJMSDestination().toString()); - MapMessage mapMsg = (MapMessage)msg; - assert mapMsg.getString("b").equals("2011"); - } else - assert false; - } catch (JMSException e) { - e.printStackTrace(System.err); - assert false; - } - } + Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg + .getJMSDestination().toString()); + DropDatabaseMessage message = deserializer.getDropDatabaseMessage(messageBody); + Assert.assertEquals("mydb", message.getDB()); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof DropDatabaseMessage); + Assert.assertEquals("mydb", message2.getDB()); + } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) { + // TODO: Fill in when PARTITION_DONE_EVENT is supported. + Assert.assertTrue("Unexpected: HCAT_PARTITION_DONE_EVENT not supported (yet).", false); + } else { + Assert.assertTrue("Unexpected event-type: " + event, false); + } + + } catch (JMSException e) { + e.printStackTrace(System.err); + assert false; + } + } } Index: src/java/org/apache/hcatalog/listener/NotificationListener.java =================================================================== --- src/java/org/apache/hcatalog/listener/NotificationListener.java (revision 1430712) +++ src/java/org/apache/hcatalog/listener/NotificationListener.java (working copy) @@ -18,11 +18,8 @@ package org.apache.hcatalog.listener; -import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -30,7 +27,6 @@ import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; import javax.jms.JMSException; -import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; @@ -63,6 +59,8 @@ import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.messaging.HCatEventMessage; +import org.apache.hcatalog.messaging.MessageFactory; /** * Implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} @@ -80,6 +78,8 @@ protected Session session; protected Connection conn; + private static MessageFactory messageFactory = MessageFactory.getInstance(); + /** * Create message bus connection and session in constructor. */ @@ -101,27 +101,28 @@ } @Override - public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException { - // Subscriber can get notification of newly add partition in a - // particular table by listening on a topic named "dbName.tableName" - // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" - if(partitionEvent.getStatus()){ + public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException { + // Subscriber can get notification of newly add partition in a + // particular table by listening on a topic named "dbName.tableName" + // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" + if(partitionEvent.getStatus()){ - Partition partition = partitionEvent.getPartition(); - String topicName = getTopicName(partition, partitionEvent); - if (topicName != null && !topicName.equals("")) { - send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT); - } - else { - LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partition.getDbName() - + "." + partition.getTableName() - + " To enable notifications for this table, please do alter table set properties (" - + HCatConstants.HCAT_MSGBUS_TOPIC_NAME - + "=.) or whatever you want topic name to be."); - } - } + Partition partition = partitionEvent.getPartition(); + String topicName = getTopicName(partition, partitionEvent); + if (topicName != null && !topicName.equals("")) { + send(messageFactory.buildAddPartitionMessage(partitionEvent.getTable(), partition), + topicName); + } + else { + LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partition.getDbName() + + "." + partition.getTableName() + + " To enable notifications for this table, please do alter table set properties (" + + HCatConstants.HCAT_MSGBUS_TOPIC_NAME + + "=.) or whatever you want topic name to be."); + } + } - } + } @Override public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { @@ -132,44 +133,44 @@ // Datanucleus throws NPE when we try to serialize a partition object // retrieved from metastore. To workaround that we reset following objects - if(partitionEvent.getStatus()){ - Partition partition = partitionEvent.getPartition(); - StorageDescriptor sd = partition.getSd(); - sd.setBucketCols(new ArrayList()); - sd.setSortCols(new ArrayList()); - sd.setParameters(new HashMap()); - sd.getSerdeInfo().setParameters(new HashMap()); - String topicName = getTopicName(partition, partitionEvent); - if (topicName != null && !topicName.equals("")) { - send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT); - } - else { - LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partition.getDbName() - + "." + partition.getTableName() - + " To enable notifications for this table, please do alter table set properties (" - + HCatConstants.HCAT_MSGBUS_TOPIC_NAME - + "=.) or whatever you want topic name to be."); - } - } - } + if(partitionEvent.getStatus()){ + Partition partition = partitionEvent.getPartition(); + String topicName = getTopicName(partition, partitionEvent); + if (topicName != null && !topicName.equals("")) { + send(messageFactory.buildDropPartitionMessage(partitionEvent.getTable(), partition), + topicName); + } + else { + LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partition.getDbName() + + "." + partition.getTableName() + + " To enable notifications for this table, please do alter table set properties (" + + HCatConstants.HCAT_MSGBUS_TOPIC_NAME + + "=.) or whatever you want topic name to be."); + } + } + } - @Override - public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException { - // Subscriber can get notification about addition of a database in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_ADD_DATABASE" - if(dbEvent.getStatus()) - send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_ADD_DATABASE_EVENT); - } + @Override + public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException { + // Subscriber can get notification about addition of a database in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_ADD_DATABASE" + if(dbEvent.getStatus()) { + String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf()); + send(messageFactory.buildCreateDatabaseMessage(dbEvent.getDatabase()), topicName); + } + } - @Override - public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { - // Subscriber can get notification about drop of a database in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_DROP_DATABASE" - if(dbEvent.getStatus()) - send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_DROP_DATABASE_EVENT); - } + @Override + public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { + // Subscriber can get notification about drop of a database in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_DROP_DATABASE" + if(dbEvent.getStatus()) { + String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf()); + send(messageFactory.buildDropDatabaseMessage(dbEvent.getDatabase()), topicName); + } + } @Override public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { @@ -192,11 +193,12 @@ me.initCause(e); throw me; } catch (NoSuchObjectException e) { - MetaException me = new MetaException(e.toString()); - me.initCause(e); + MetaException me = new MetaException(e.toString()); + me.initCause(e); throw me; } - send(newTbl, getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase(), HCatConstants.HCAT_ADD_TABLE_EVENT); + String topicName = getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase(); + send(messageFactory.buildCreateTableMessage(newTbl), topicName); } } @@ -220,66 +222,62 @@ sd.setSortCols(new ArrayList()); sd.setParameters(new HashMap()); sd.getSerdeInfo().setParameters(new HashMap()); - send(table,getTopicPrefix(tableEvent.getHandler().getHiveConf())+"."+table.getDbName().toLowerCase(), HCatConstants.HCAT_DROP_TABLE_EVENT); + String topicName = getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + table.getDbName().toLowerCase(); + send(messageFactory.buildDropTableMessage(table), topicName); } } - /** - * @param msgBody is the metastore object. It is sent in full such that - * if subscriber is really interested in details, it can reconstruct it fully. - * In case of finalize_partition message this will be string specification of - * the partition. - * @param topicName is the name on message broker on which message is sent. - * @param event is the value of HCAT_EVENT property in message. It can be - * used to select messages in client side. - */ - protected void send(Object msgBody, String topicName, String event){ - - try{ - - if(null == session){ - // this will happen, if we never able to establish a connection. - createConnection(); - if (null == session){ - // Still not successful, return from here. - LOG.error("Invalid session. Failed to send message on topic: "+ - topicName + " event: "+event); - return; - } - } + /** + * @param hCatEventMessage The HCatEventMessage being sent over JMS. + * @param topicName is the name on message broker on which message is sent. + */ + protected void send(HCatEventMessage hCatEventMessage, String topicName) { + try { Destination topic = null; - topic = getTopic(topicName); + if(null == session){ + // this will happen, if we never able to establish a connection. + createConnection(); + if (null == session){ + // Still not successful, return from here. + LOG.error("Invalid session. Failed to send message on topic: " + + topicName + " event: " + hCatEventMessage.getEventType()); + return; + } + } + try{ + // Topics are created on demand. If it doesn't exist on broker it will + // be created when broker receives this message. + topic = session.createTopic(topicName); + } catch (IllegalStateException ise){ + // this will happen if we were able to establish connection once, but its no longer valid, + // ise is thrown, catch it and retry. + LOG.error("Seems like connection is lost. Retrying", ise); + createConnection(); + topic = session.createTopic(topicName); + } if (null == topic){ // Still not successful, return from here. - LOG.error("Invalid session. Failed to send message on topic: "+ - topicName + " event: "+ event); + LOG.error("Invalid session. Failed to send message on topic: " + + topicName + " event: " + hCatEventMessage.getEventType()); return; } MessageProducer producer = session.createProducer(topic); - Message msg; - if (msgBody instanceof Map){ - MapMessage mapMsg = session.createMapMessage(); - Map incomingMap = (Map)msgBody; - for (Entry partCol : incomingMap.entrySet()){ - mapMsg.setString(partCol.getKey(), partCol.getValue()); - } - msg = mapMsg; - } - else { - msg = session.createObjectMessage((Serializable)msgBody); - } + Message msg = session.createTextMessage(hCatEventMessage.toString()); - msg.setStringProperty(HCatConstants.HCAT_EVENT, event); - producer.send(msg); - // Message must be transacted before we return. - session.commit(); - } catch(Exception e){ - // Gobble up the exception. Message delivery is best effort. - LOG.error("Failed to send message on topic: "+topicName + - " event: "+event , e); - } - } + msg.setStringProperty(HCatConstants.HCAT_EVENT, hCatEventMessage.getEventType().toString()); + msg.setStringProperty(HCatConstants.HCAT_MESSAGE_VERSION, messageFactory.getVersion()); + msg.setStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT, messageFactory.getMessageFormat()); + producer.send(msg); + // Message must be transacted before we return. + session.commit(); + } + catch(Exception e){ + // Gobble up the exception. Message delivery is best effort. + LOG.error("Failed to send message on topic: " + topicName + + " event: " + hCatEventMessage.getEventType(), e); + } + } /** * Get the topic object for the topicName, it also tries to reconnect @@ -350,8 +348,9 @@ @Override public void onLoadPartitionDone(LoadPartitionDoneEvent lpde) throws MetaException { - if(lpde.getStatus()) - send(lpde.getPartitionName(),lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),HCatConstants.HCAT_PARTITION_DONE_EVENT); +// TODO: Fix LoadPartitionDoneEvent. Currently, LPDE can only carry a single partition-spec. And that defeats the purpose. +// if(lpde.getStatus()) +// send(lpde.getPartitionName(),lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),HCatConstants.HCAT_PARTITION_DONE_EVENT); } @Override Index: src/java/org/apache/hcatalog/common/HCatConstants.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatConstants.java (revision 1430712) +++ src/java/org/apache/hcatalog/common/HCatConstants.java (working copy) @@ -98,15 +98,20 @@ // Message Bus related properties. public static final String HCAT_DEFAULT_TOPIC_PREFIX = "hcat"; public static final String HCAT_EVENT = "HCAT_EVENT"; - public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION"; - public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION"; - public static final String HCAT_PARTITION_DONE_EVENT = "HCAT_PARTITION_DONE"; - public static final String HCAT_ADD_TABLE_EVENT = "HCAT_ADD_TABLE"; - public static final String HCAT_DROP_TABLE_EVENT = "HCAT_DROP_TABLE"; - public static final String HCAT_ADD_DATABASE_EVENT = "HCAT_ADD_DATABASE"; - public static final String HCAT_DROP_DATABASE_EVENT = "HCAT_DROP_DATABASE"; + public static final String HCAT_ADD_PARTITION_EVENT = "ADD_PARTITION"; + public static final String HCAT_DROP_PARTITION_EVENT = "DROP_PARTITION"; + public static final String HCAT_PARTITION_DONE_EVENT = "PARTITION_DONE"; + public static final String HCAT_CREATE_TABLE_EVENT = "CREATE_TABLE"; + public static final String HCAT_DROP_TABLE_EVENT = "DROP_TABLE"; + public static final String HCAT_CREATE_DATABASE_EVENT = "CREATE_DATABASE"; + public static final String HCAT_DROP_DATABASE_EVENT = "DROP_DATABASE"; + public static final String HCAT_MESSAGE_VERSION = "HCAT_MESSAGE_VERSION"; + public static final String HCAT_MESSAGE_FORMAT = "HCAT_MESSAGE_FORMAT"; + public static final String CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX = "hcatalog.message.factory.impl."; + public static final String CONF_LABEL_HCAT_MESSAGE_FORMAT = "hcatalog.message.format"; + public static final String DEFAULT_MESSAGE_FACTORY_IMPL = "org.apache.hcatalog.messaging.json.JSONMessageFactory"; - // System environment variables + // System environment variables public static final String SYSENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"; // Hadoop Conf Var Names Index: src/java/org/apache/hcatalog/messaging/CreateTableMessage.java =================================================================== --- src/java/org/apache/hcatalog/messaging/CreateTableMessage.java (revision 0) +++ src/java/org/apache/hcatalog/messaging/CreateTableMessage.java (revision 0) @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.messaging; + +/** + * HCat message sent when a table is created in HCatalog. + */ +public abstract class CreateTableMessage extends HCatEventMessage { + + protected CreateTableMessage() { + super(EventType.CREATE_TABLE); + } + + /** + * Getter for the name of table created in HCatalog. + * @return Table-name (String). + */ + public abstract String getTable(); + + @Override + public HCatEventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + return super.checkValid(); + } +} Index: src/java/org/apache/hcatalog/messaging/DropTableMessage.java =================================================================== --- src/java/org/apache/hcatalog/messaging/DropTableMessage.java (revision 0) +++ src/java/org/apache/hcatalog/messaging/DropTableMessage.java (revision 0) @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.messaging; + +/** + * HCat message sent when a Table is dropped in HCatalog. + */ +public abstract class DropTableMessage extends HCatEventMessage { + + protected DropTableMessage() { + super(EventType.DROP_TABLE); + } + + /** + * Getter for the name of the table being dropped. + * @return Table-name (String). + */ + public abstract String getTable(); + + @Override + public HCatEventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + return super.checkValid(); + } +} Index: src/java/org/apache/hcatalog/messaging/DropPartitionMessage.java =================================================================== --- src/java/org/apache/hcatalog/messaging/DropPartitionMessage.java (revision 0) +++ src/java/org/apache/hcatalog/messaging/DropPartitionMessage.java (revision 0) @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.messaging; + +import java.util.List; +import java.util.Map; + +/** + * HCat message sent when a partition is dropped in HCatalog. + */ +public abstract class DropPartitionMessage extends HCatEventMessage { + + protected DropPartitionMessage() { + super(EventType.DROP_PARTITION); + } + + public abstract String getTable(); + public abstract List> getPartitions (); + + @Override + public HCatEventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + if (getPartitions() == null) + throw new IllegalStateException("Partition-list unset."); + return super.checkValid(); + } +} Index: src/java/org/apache/hcatalog/messaging/HCatEventMessage.java =================================================================== --- src/java/org/apache/hcatalog/messaging/HCatEventMessage.java (revision 0) +++ src/java/org/apache/hcatalog/messaging/HCatEventMessage.java (revision 0) @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.messaging; + +import org.apache.hcatalog.common.HCatConstants; + +/** + * Class representing messages emitted when Metastore operations are done. + * (E.g. Creation and deletion of databases, tables and partitions.) + */ +public abstract class HCatEventMessage { + + /** + * Enumeration of all supported types of Metastore operations. + */ + public static enum EventType { + + CREATE_DATABASE(HCatConstants.HCAT_CREATE_DATABASE_EVENT), + DROP_DATABASE(HCatConstants.HCAT_DROP_DATABASE_EVENT), + CREATE_TABLE(HCatConstants.HCAT_CREATE_TABLE_EVENT), + DROP_TABLE(HCatConstants.HCAT_DROP_TABLE_EVENT), + ADD_PARTITION(HCatConstants.HCAT_ADD_PARTITION_EVENT), + DROP_PARTITION(HCatConstants.HCAT_DROP_PARTITION_EVENT); + + private String typeString; + + EventType(String typeString) { + this.typeString = typeString; + } + + @Override + public String toString() { return typeString; } + } + + protected EventType eventType; + + protected HCatEventMessage(EventType eventType) { + this.eventType = eventType; + } + + public EventType getEventType() { + return eventType; + } + + /** + * Getter for HCatalog Server's URL. + * (This is where the event originates from.) + * @return HCatalog Server's URL (String). + */ + public abstract String getServer(); + + /** + * Getter for the Kerberos principal of the HCatalog service. + * @return HCatalog Service Principal (String). + */ + public abstract String getServicePrincipal(); + + /** + * Getter for the name of the Database on which the Metastore operation is done. + * @return Database-name (String). + */ + public abstract String getDB(); + + /** + * Getter for the timestamp associated with the operation. + * @return Timestamp (Long - seconds since epoch). + */ + public abstract Long getTimestamp(); + + /** + * Class invariant. Checked after construction or deserialization. + */ + public HCatEventMessage checkValid() { + if (getServer() == null || getServicePrincipal() == null) + throw new IllegalStateException("Server-URL/Service-Principal shouldn't be null."); + if (getEventType() == null) + throw new IllegalStateException("Event-type unset."); + if (getDB() == null) + throw new IllegalArgumentException("DB-name unset."); + + return this; + } +} Index: src/java/org/apache/hcatalog/messaging/AddPartitionMessage.java =================================================================== --- src/java/org/apache/hcatalog/messaging/AddPartitionMessage.java (revision 0) +++ src/java/org/apache/hcatalog/messaging/AddPartitionMessage.java (revision 0) @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.messaging; + +import java.util.List; +import java.util.Map; + +/** + * The HCat message sent when partition(s) are added to a table. + */ +public abstract class AddPartitionMessage extends HCatEventMessage { + + protected AddPartitionMessage() { + super(EventType.ADD_PARTITION); + } + + /** + * Getter for name of table (where partitions are added). + * @return Table-name (String). + */ + public abstract String getTable(); + + /** + * Getter for list of partitions added. + * @return List of maps, where each map identifies values for each partition-key, for every added partition. + */ + public abstract List> getPartitions (); + + @Override + public HCatEventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + if (getPartitions() == null) + throw new IllegalStateException("Partition-list unset."); + return super.checkValid(); + } +} Index: src/java/org/apache/hcatalog/messaging/MessageDeserializer.java =================================================================== --- src/java/org/apache/hcatalog/messaging/MessageDeserializer.java (revision 0) +++ src/java/org/apache/hcatalog/messaging/MessageDeserializer.java (revision 0) @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.messaging; + +/** + * Interface for converting HCat events from String-form back to HCatEventMessage instances. + */ +public abstract class MessageDeserializer { + + /** + * Method to construct HCatEventMessage from string. + */ + public HCatEventMessage getHCatEventMessage(String eventTypeString, String messageBody) { + + switch (HCatEventMessage.EventType.valueOf(eventTypeString)) { + case CREATE_DATABASE: return getCreateDatabaseMessage(messageBody); + case DROP_DATABASE: return getDropDatabaseMessage(messageBody); + case CREATE_TABLE: return getCreateTableMessage(messageBody); + case DROP_TABLE: return getDropTableMessage(messageBody); + case ADD_PARTITION: return getAddPartitionMessage(messageBody); + case DROP_PARTITION: return getDropPartitionMessage(messageBody); + + default: + throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString); + } + } + + /** + * Method to de-serialize CreateDatabaseMessage instance. + */ + public abstract CreateDatabaseMessage getCreateDatabaseMessage(String messageBody); + + /** + * Method to de-serialize DropDatabaseMessage instance. + */ + public abstract DropDatabaseMessage getDropDatabaseMessage(String messageBody); + + /** + * Method to de-serialize CreateTableMessage instance. + */ + public abstract CreateTableMessage getCreateTableMessage(String messageBody); + + /** + * Method to de-serialize DropTableMessage instance. + */ + public abstract DropTableMessage getDropTableMessage(String messageBody); + + /** + * Method to de-serialize AddPartitionMessage instance. + */ + public abstract AddPartitionMessage getAddPartitionMessage(String messageBody); + + /** + * Method to de-serialize DropPartitionMessage instance. + */ + public abstract DropPartitionMessage getDropPartitionMessage(String messageBody); + + // Protection against construction. + protected MessageDeserializer() {} +} Index: src/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java =================================================================== --- src/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java (revision 0) +++ src/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java (revision 0) @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.messaging.jms; + +import org.apache.commons.lang.StringUtils; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.messaging.HCatEventMessage; +import org.apache.hcatalog.messaging.MessageFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; + +/** + * Helper Utility to assist consumers of HCat Messages in extracting + * message-content from JMS messages. + */ +public class MessagingUtils { + + /** + * Method to return HCatEventMessage contained in the JMS message. + * @param message The JMS Message instance + * @return The contained HCatEventMessage + */ + public static HCatEventMessage getMessage(Message message) { + try { + String messageBody = ((TextMessage)message).getText(); + String eventType = message.getStringProperty(HCatConstants.HCAT_EVENT); + String messageVersion = message.getStringProperty(HCatConstants.HCAT_MESSAGE_VERSION); + String messageFormat = message.getStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT); + + if (StringUtils.isEmpty(messageBody) || StringUtils.isEmpty(eventType)) + throw new IllegalArgumentException("Could not extract HCatEventMessage. " + + "EventType and/or MessageBody is null/empty."); + + return MessageFactory.getDeserializer(messageFormat, messageVersion).getHCatEventMessage(eventType, messageBody); + } + catch (JMSException exception) { + throw new IllegalArgumentException("Could not extract HCatEventMessage. ", exception); + } + } + + // Prevent instantiation. + private MessagingUtils() {} +} Index: src/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java =================================================================== --- src/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java (revision 0) +++ src/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java (revision 0) @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.messaging.json; + +import org.apache.hcatalog.messaging.AddPartitionMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.List; +import java.util.Map; + +/** + * JSON implementation of AddPartitionMessage. + */ +public class JSONAddPartitionMessage extends AddPartitionMessage { + + @JsonProperty + String server, + servicePrincipal, + db, + table; + + @JsonProperty + Long timestamp; + + @JsonProperty + List> partitions; + + /** + * Default Constructor. Required for Jackson. + */ + public JSONAddPartitionMessage() {} + + public JSONAddPartitionMessage(String server, String servicePrincipal, String db, String table, + List> partitions, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.partitions = partitions; + this.timestamp = timestamp; + checkValid(); + } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public String getDB() { return db; } + + @Override + public String getTable() { return table; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public List> getPartitions () { return partitions; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } + } Index: src/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java =================================================================== --- src/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java (revision 0) +++ src/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java (revision 0) @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.messaging.json; + +import org.apache.hcatalog.messaging.AddPartitionMessage; +import org.apache.hcatalog.messaging.CreateDatabaseMessage; +import org.apache.hcatalog.messaging.CreateTableMessage; +import org.apache.hcatalog.messaging.DropDatabaseMessage; +import org.apache.hcatalog.messaging.DropPartitionMessage; +import org.apache.hcatalog.messaging.DropTableMessage; +import org.apache.hcatalog.messaging.MessageDeserializer; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * MessageDeserializer implementation, for deserializing from JSON strings. + */ +public class JSONMessageDeserializer extends MessageDeserializer { + + static ObjectMapper mapper = new ObjectMapper(); // Thread-safe. + + static { + mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @Override + public CreateDatabaseMessage getCreateDatabaseMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONCreateDatabaseMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONCreateDatabaseMessage.", exception); + } + } + + @Override + public DropDatabaseMessage getDropDatabaseMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropDatabaseMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONDropDatabaseMessage.", exception); + } + } + + @Override + public CreateTableMessage getCreateTableMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONCreateTableMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONCreateTableMessage.", exception); + } + } + + @Override + public DropTableMessage getDropTableMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropTableMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONDropTableMessage.", exception); + } + } + + @Override + public AddPartitionMessage getAddPartitionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAddPartitionMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct AddPartitionMessage.", exception); + } + } + + @Override + public DropPartitionMessage getDropPartitionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropPartitionMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct DropPartitionMessage.", exception); + } + } +} Index: src/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java =================================================================== --- src/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java (revision 0) +++ src/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java (revision 0) @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hcatalog.messaging.AddPartitionMessage; +import org.apache.hcatalog.messaging.CreateDatabaseMessage; +import org.apache.hcatalog.messaging.CreateTableMessage; +import org.apache.hcatalog.messaging.DropDatabaseMessage; +import org.apache.hcatalog.messaging.DropPartitionMessage; +import org.apache.hcatalog.messaging.DropTableMessage; +import org.apache.hcatalog.messaging.MessageDeserializer; +import org.apache.hcatalog.messaging.MessageFactory; + +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * The JSON implementation of the MessageFactory. Constructs JSON implementations of + * each message-type. + */ +public class JSONMessageFactory extends MessageFactory { + + private static JSONMessageDeserializer deserializer = new JSONMessageDeserializer(); + + @Override + public MessageDeserializer getDeserializer() { + return deserializer; + } + + @Override + public String getVersion() { + return "0.1"; + } + + @Override + public String getMessageFormat() { + return "json"; + } + + @Override + public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) { + return new JSONCreateDatabaseMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db.getName(), + System.currentTimeMillis() / 1000); + } + + @Override + public DropDatabaseMessage buildDropDatabaseMessage(Database db) { + return new JSONDropDatabaseMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db.getName(), + System.currentTimeMillis() / 1000); + } + + @Override + public CreateTableMessage buildCreateTableMessage(Table table) { + return new JSONCreateTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), + table.getTableName(), System.currentTimeMillis()/1000); + } + + @Override + public DropTableMessage buildDropTableMessage(Table table) { + return new JSONDropTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), table.getTableName(), + System.currentTimeMillis()/1000); + } + + @Override + public AddPartitionMessage buildAddPartitionMessage(Table table, Partition partition) { + return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, partition.getDbName(), + partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)), + System.currentTimeMillis()/1000); + } + + @Override + public DropPartitionMessage buildDropPartitionMessage(Table table, Partition partition) { + return new JSONDropPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, partition.getDbName(), + partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)), + System.currentTimeMillis()/1000); + } + + private static Map getPartitionKeyValues(Table table, Partition partition) { + Map partitionKeys = new LinkedHashMap(); + for (int i=0; i> partitions; + + /** + * Default Constructor. Required for Jackson. + */ + public JSONDropPartitionMessage() {} + + public JSONDropPartitionMessage(String server, String servicePrincipal, String db, String table, + List> partitions, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.partitions = partitions; + this.timestamp = timestamp; + checkValid(); + } + + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public String getDB() { return db; } + + @Override + public String getTable() { return table; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public List> getPartitions () { return partitions; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} Index: src/java/org/apache/hcatalog/messaging/MessageFactory.java =================================================================== --- src/java/org/apache/hcatalog/messaging/MessageFactory.java (revision 0) +++ src/java/org/apache/hcatalog/messaging/MessageFactory.java (revision 0) @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.messaging; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hcatalog.common.HCatConstants; + +/** + * Abstract Factory for the construction of HCatalog message instances. + */ +public abstract class MessageFactory { + + private static MessageFactory instance = null; + private static Log LOG = LogFactory.getLog(MessageFactory.class); + + protected static final HiveConf hiveConf = new HiveConf(); + static { + hiveConf.addResource("hive-site.xml"); + } + + private static final String HCAT_MESSAGE_FORMAT = hiveConf.get(HCatConstants.CONF_LABEL_HCAT_MESSAGE_FORMAT, "json"); + private static final String HCAT_MESSAGE_FACTORY_IMPL + = hiveConf.get(HCatConstants.CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX + HCAT_MESSAGE_FORMAT, HCatConstants.DEFAULT_MESSAGE_FACTORY_IMPL); + + protected static final String HCAT_SERVER_URL = hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS); + protected static final String HCAT_SERVICE_PRINCIPAL = hiveConf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL); + + /** + * Getter for MessageFactory instance. + */ + public static MessageFactory getInstance() { + if (instance == null) { + instance = getInstance(HCAT_MESSAGE_FACTORY_IMPL); + } + + return instance; + } + + private static MessageFactory getInstance(String className) { + try { + return (MessageFactory)ReflectionUtils.newInstance(Class.forName(className), hiveConf); + } + catch (ClassNotFoundException classNotFound) { + throw new IllegalStateException("Could not construct MessageFactory implementation: ", classNotFound); + } + } + + /** + * Getter for MessageDeserializer, corresponding to the specified format and version. + * @param format Serialization format for notifications. + * @param version Version of serialization format (currently ignored.) + * @return MessageDeserializer. + */ + public static MessageDeserializer getDeserializer(String format, + String version) { + return getInstance(hiveConf.get(HCatConstants.CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX + format, + HCatConstants.DEFAULT_MESSAGE_FACTORY_IMPL)).getDeserializer(); + } + + public abstract MessageDeserializer getDeserializer(); + + /** + * Getter for version-string, corresponding to all constructed messages. + */ + public abstract String getVersion(); + + /** + * Getter for message-format. + */ + public abstract String getMessageFormat(); + + /** + * Factory method for CreateDatabaseMessage. + * @param db The Database being added. + * @return CreateDatabaseMessage instance. + */ + public abstract CreateDatabaseMessage buildCreateDatabaseMessage(Database db); + + /** + * Factory method for DropDatabaseMessage. + * @param db The Database being dropped. + * @return DropDatabaseMessage instance. + */ + public abstract DropDatabaseMessage buildDropDatabaseMessage(Database db); + + /** + * Factory method for CreateTableMessage. + * @param table The Table being created. + * @return CreateTableMessage instance. + */ + public abstract CreateTableMessage buildCreateTableMessage(Table table); + + /** + * Factory method for DropTableMessage. + * @param table The Table being dropped. + * @return DropTableMessage instance. + */ + public abstract DropTableMessage buildDropTableMessage(Table table); + + /** + * Factory method for AddPartitionMessage. + * @param table The Table to which the partition is added. + * @param partition The Partition being added. + * @return AddPartitionMessage instance. + */ + public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Partition partition); + + /** + * Factory method for DropPartitionMessage. + * @param table The Table from which the partition is dropped. + * @param partition The Partition being dropped. + * @return DropPartitionMessage instance. + */ + public abstract DropPartitionMessage buildDropPartitionMessage(Table table, Partition partition); +} Index: src/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java =================================================================== --- src/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java (revision 0) +++ src/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java (revision 0) @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.messaging; + +/** + * HCat message sent when a Database is created in HCatalog. + */ +public abstract class CreateDatabaseMessage extends HCatEventMessage { + + protected CreateDatabaseMessage() { + super(EventType.CREATE_DATABASE); + } + +} Index: src/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java =================================================================== --- src/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java (revision 0) +++ src/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java (revision 0) @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.messaging; + +/** + * HCat message sent when a Database is dropped from HCatalog. + */ +public abstract class DropDatabaseMessage extends HCatEventMessage { + + protected DropDatabaseMessage() { + super(EventType.DROP_DATABASE); + } +} Index: ivy/libraries.properties =================================================================== --- ivy/libraries.properties (revision 1430712) +++ ivy/libraries.properties (working copy) @@ -33,7 +33,7 @@ hadoop23.version=0.23.3 hbase.version=0.92.0 high-scale-lib.version=1.1.1 -hive.version=0.9.0 +hive.version=0.9.1-SNAPSHOT ivy.version=2.1.0 jackson.version=1.7.3 javax-mgmt.version=1.1-rev-1