Index: core/src/main/java/org/apache/hcatalog/common/HCatConstants.java =================================================================== --- core/src/main/java/org/apache/hcatalog/common/HCatConstants.java (revision 1430643) +++ core/src/main/java/org/apache/hcatalog/common/HCatConstants.java (working copy) @@ -121,13 +121,18 @@ // Message Bus related properties. public static final String HCAT_DEFAULT_TOPIC_PREFIX = "hcat"; public static final String HCAT_EVENT = "HCAT_EVENT"; - public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION"; - public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION"; - public static final String HCAT_PARTITION_DONE_EVENT = "HCAT_PARTITION_DONE"; - public static final String HCAT_ADD_TABLE_EVENT = "HCAT_ADD_TABLE"; - public static final String HCAT_DROP_TABLE_EVENT = "HCAT_DROP_TABLE"; - public static final String HCAT_ADD_DATABASE_EVENT = "HCAT_ADD_DATABASE"; - public static final String HCAT_DROP_DATABASE_EVENT = "HCAT_DROP_DATABASE"; + public static final String HCAT_ADD_PARTITION_EVENT = "ADD_PARTITION"; + public static final String HCAT_DROP_PARTITION_EVENT = "DROP_PARTITION"; + public static final String HCAT_PARTITION_DONE_EVENT = "PARTITION_DONE"; + public static final String HCAT_CREATE_TABLE_EVENT = "CREATE_TABLE"; + public static final String HCAT_DROP_TABLE_EVENT = "DROP_TABLE"; + public static final String HCAT_CREATE_DATABASE_EVENT = "CREATE_DATABASE"; + public static final String HCAT_DROP_DATABASE_EVENT = "DROP_DATABASE"; + public static final String HCAT_MESSAGE_VERSION = "HCAT_MESSAGE_VERSION"; + public static final String HCAT_MESSAGE_FORMAT = "HCAT_MESSAGE_FORMAT"; + public static final String CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX = "hcatalog.message.factory.impl."; + public static final String CONF_LABEL_HCAT_MESSAGE_FORMAT = "hcatalog.message.format"; + public static final String DEFAULT_MESSAGE_FACTORY_IMPL = "org.apache.hcatalog.messaging.json.JSONMessageFactory"; // System environment variables public static final String SYSENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"; Index: server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java =================================================================== --- server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java (revision 1430643) +++ server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java (working copy) @@ -25,7 +25,7 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; -import javax.jms.ObjectMessage; +import javax.jms.TextMessage; import javax.jms.Session; import junit.framework.TestCase; @@ -36,11 +36,12 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.messaging.HCatEventMessage; +import org.apache.hcatalog.messaging.jms.MessagingUtils; public class TestMsgBusConnection extends TestCase { @@ -89,29 +90,28 @@ try { driver.run("create database testconndb"); Message msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, + assertTrue("Expected TextMessage", msg instanceof TextMessage); + assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); - assertEquals("testconndb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); + HCatEventMessage messageObject = MessagingUtils.getMessage(msg); + assertEquals("testconndb", messageObject.getDB()); broker.stop(); driver.run("drop database testconndb cascade"); broker.start(true); connectClient(); driver.run("create database testconndb"); msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, + assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); - assertEquals("testconndb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); + assertEquals("testconndb", messageObject.getDB()); driver.run("drop database testconndb cascade"); msg = consumer.receive(); assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); - assertEquals("testconndb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); + assertEquals("testconndb", messageObject.getDB()); } catch (NoSuchObjectException nsoe) { nsoe.printStackTrace(System.err); assert false; Index: server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java =================================================================== --- server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java (revision 1430643) +++ server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java (working copy) @@ -29,26 +29,32 @@ import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.MapMessage; +import javax.jms.TextMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; -import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.mapreduce.HCatBaseTest; +import org.apache.hcatalog.messaging.AddPartitionMessage; +import org.apache.hcatalog.messaging.CreateDatabaseMessage; +import org.apache.hcatalog.messaging.CreateTableMessage; +import org.apache.hcatalog.messaging.DropDatabaseMessage; +import org.apache.hcatalog.messaging.DropPartitionMessage; +import org.apache.hcatalog.messaging.DropTableMessage; +import org.apache.hcatalog.messaging.HCatEventMessage; +import org.apache.hcatalog.messaging.MessageDeserializer; +import org.apache.hcatalog.messaging.MessageFactory; +import org.apache.hcatalog.messaging.jms.MessagingUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -95,10 +101,9 @@ @After public void tearDown() throws Exception { List expectedMessages = Arrays.asList( - HCatConstants.HCAT_ADD_DATABASE_EVENT, - HCatConstants.HCAT_ADD_TABLE_EVENT, + HCatConstants.HCAT_CREATE_DATABASE_EVENT, + HCatConstants.HCAT_CREATE_TABLE_EVENT, HCatConstants.HCAT_ADD_PARTITION_EVENT, - HCatConstants.HCAT_PARTITION_DONE_EVENT, HCatConstants.HCAT_DROP_PARTITION_EVENT, HCatConstants.HCAT_DROP_TABLE_EVENT, HCatConstants.HCAT_DROP_DATABASE_EVENT); @@ -125,61 +130,87 @@ String event; try { event = msg.getStringProperty(HCatConstants.HCAT_EVENT); + String format = msg.getStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT); + String version = msg.getStringProperty(HCatConstants.HCAT_MESSAGE_VERSION); + String messageBody = ((TextMessage)msg).getText(); actualMessages.add(event); + MessageDeserializer deserializer = MessageFactory.getDeserializer(format, version); - if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) { + if (event.equals(HCatConstants.HCAT_CREATE_DATABASE_EVENT)) { Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg .getJMSDestination().toString()); - Assert.assertEquals("mydb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); - } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) { + CreateDatabaseMessage message = deserializer.getCreateDatabaseMessage(messageBody); + Assert.assertEquals("mydb", message.getDB()); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof CreateDatabaseMessage); + Assert.assertEquals("mydb", message2.getDB()); + } else if (event.equals(HCatConstants.HCAT_CREATE_TABLE_EVENT)) { Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); - Table tbl = (Table) (((ObjectMessage) msg).getObject()); - Assert.assertEquals("mytbl", tbl.getTableName()); - Assert.assertEquals("mydb", tbl.getDbName()); - Assert.assertEquals(1, tbl.getPartitionKeysSize()); + CreateTableMessage message = deserializer.getCreateTableMessage(messageBody); + Assert.assertEquals("mytbl", message.getTable()); + Assert.assertEquals("mydb", message.getDB()); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof CreateTableMessage); + Assert.assertEquals("mydb", message2.getDB()); + Assert.assertEquals("mytbl", ((CreateTableMessage) message2).getTable()); } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) { Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() .toString()); - Partition part = (Partition) (((ObjectMessage) msg).getObject()); - Assert.assertEquals("mytbl", part.getTableName()); - Assert.assertEquals("mydb", part.getDbName()); - List vals = new ArrayList(1); - vals.add("2011"); - Assert.assertEquals(vals, part.getValues()); + AddPartitionMessage message = deserializer.getAddPartitionMessage(messageBody); + Assert.assertEquals("mytbl", message.getTable()); + Assert.assertEquals("mydb", message.getDB()); + Assert.assertEquals(1, message.getPartitions().size()); + Assert.assertEquals("2011", message.getPartitions().get(0).get("b")); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof AddPartitionMessage); + Assert.assertEquals("mydb", message2.getDB()); + Assert.assertEquals("mytbl", ((AddPartitionMessage) message2).getTable()); + Assert.assertEquals(1, ((AddPartitionMessage) message2).getPartitions().size()); + Assert.assertEquals("2011", ((AddPartitionMessage) message2).getPartitions().get(0).get("b")); } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() .toString()); - Partition part = (Partition) (((ObjectMessage) msg).getObject()); - Assert.assertEquals("mytbl", part.getTableName()); - Assert.assertEquals("mydb", part.getDbName()); - List vals = new ArrayList(1); - vals.add("2011"); - Assert.assertEquals(vals, part.getValues()); + DropPartitionMessage message = deserializer.getDropPartitionMessage(messageBody); + Assert.assertEquals("mytbl", message.getTable()); + Assert.assertEquals("mydb", message.getDB()); + Assert.assertEquals(1, message.getPartitions().size()); + Assert.assertEquals("2011", message.getPartitions().get(0).get("b")); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof DropPartitionMessage); + Assert.assertEquals("mydb", message2.getDB()); + Assert.assertEquals("mytbl", ((DropPartitionMessage) message2).getTable()); + Assert.assertEquals(1, ((DropPartitionMessage) message2).getPartitions().size()); + Assert.assertEquals("2011", ((DropPartitionMessage) message2).getPartitions().get(0).get("b")); } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) { Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); - Table tbl = (Table) (((ObjectMessage) msg).getObject()); - Assert.assertEquals("mytbl", tbl.getTableName()); - Assert.assertEquals("mydb", tbl.getDbName()); - Assert.assertEquals(1, tbl.getPartitionKeysSize()); + DropTableMessage message = deserializer.getDropTableMessage(messageBody); + Assert.assertEquals("mytbl", message.getTable()); + Assert.assertEquals("mydb", message.getDB()); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof DropTableMessage); + Assert.assertEquals("mydb", message2.getDB()); + Assert.assertEquals("mytbl", ((DropTableMessage) message2).getTable()); } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) { Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg .getJMSDestination().toString()); - Assert.assertEquals("mydb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); + DropDatabaseMessage message = deserializer.getDropDatabaseMessage(messageBody); + Assert.assertEquals("mydb", message.getDB()); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof DropDatabaseMessage); + Assert.assertEquals("mydb", message2.getDB()); } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) { - Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() - .toString()); - MapMessage mapMsg = (MapMessage) msg; - assert mapMsg.getString("b").equals("2011"); - } else - assert false; + // TODO: Fill in when PARTITION_DONE_EVENT is supported. + Assert.assertTrue("Unexpected: HCAT_PARTITION_DONE_EVENT not supported (yet).", false); + } else { + Assert.assertTrue("Unexpected event-type: " + event, false); + } + } catch (JMSException e) { e.printStackTrace(System.err); assert false; Index: server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (revision 1430643) +++ server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (working copy) @@ -19,11 +19,8 @@ package org.apache.hcatalog.listener; -import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -31,7 +28,6 @@ import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; import javax.jms.JMSException; -import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; @@ -62,6 +58,8 @@ import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.messaging.HCatEventMessage; +import org.apache.hcatalog.messaging.MessageFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +78,7 @@ private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class); protected Session session; protected Connection conn; + private static MessageFactory messageFactory = MessageFactory.getInstance(); /** * Create message bus connection and session in constructor. @@ -112,7 +111,7 @@ Partition partition = partitionEvent.getPartition(); String topicName = getTopicName(partition, partitionEvent); if (topicName != null && !topicName.equals("")) { - send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT); + send(messageFactory.buildAddPartitionMessage(partitionEvent.getTable(), partition), topicName); } else { LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partition.getDbName() @@ -149,7 +148,7 @@ sd.getSkewedInfo().setSkewedColNames(new ArrayList()); String topicName = getTopicName(partition, partitionEvent); if (topicName != null && !topicName.equals("")) { - send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT); + send(messageFactory.buildDropPartitionMessage(partitionEvent.getTable(), partition), topicName); } else { LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partition.getDbName() @@ -168,9 +167,10 @@ // Subscriber can get notification about addition of a database in HCAT // by listening on a topic named "HCAT" and message selector string // as "HCAT_EVENT = HCAT_ADD_DATABASE" - if (dbEvent.getStatus()) - send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler() - .getHiveConf()), HCatConstants.HCAT_ADD_DATABASE_EVENT); + if (dbEvent.getStatus()) { + String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf()); + send(messageFactory.buildCreateDatabaseMessage(dbEvent.getDatabase()), topicName); + } } @Override @@ -178,9 +178,10 @@ // Subscriber can get notification about drop of a database in HCAT // by listening on a topic named "HCAT" and message selector string // as "HCAT_EVENT = HCAT_DROP_DATABASE" - if (dbEvent.getStatus()) - send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler() - .getHiveConf()), HCatConstants.HCAT_DROP_DATABASE_EVENT); + if (dbEvent.getStatus()) { + String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf()); + send(messageFactory.buildDropDatabaseMessage(dbEvent.getDatabase()), topicName); + } } @Override @@ -210,9 +211,8 @@ me.initCause(e); throw me; } - send(newTbl, getTopicPrefix(conf) + "." - + newTbl.getDbName().toLowerCase(), - HCatConstants.HCAT_ADD_TABLE_EVENT); + String topicName = getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase(); + send(messageFactory.buildCreateTableMessage(newTbl), topicName); } } @@ -243,72 +243,60 @@ if (tableEvent.getStatus()) { Table table = tableEvent.getTable(); - StorageDescriptor sd = table.getSd(); - sd.setBucketCols(new ArrayList()); - sd.setSortCols(new ArrayList()); - sd.setParameters(new HashMap()); - sd.getSerdeInfo().setParameters(new HashMap()); - sd.getSkewedInfo().setSkewedColNames(new ArrayList()); - send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." - + table.getDbName().toLowerCase(), - HCatConstants.HCAT_DROP_TABLE_EVENT); + String topicName = getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + table.getDbName().toLowerCase(); + send(messageFactory.buildDropTableMessage(table), topicName); } } /** - * @param msgBody - * is the metastore object. It is sent in full such that if - * subscriber is really interested in details, it can reconstruct it - * fully. In case of finalize_partition message this will be string - * specification of the partition. - * @param topicName - * is the name on message broker on which message is sent. - * @param event - * is the value of HCAT_EVENT property in message. It can be used to - * select messages in client side. + * @param hCatEventMessage The HCatEventMessage being sent over JMS. + * @param topicName is the name on message broker on which message is sent. */ - protected void send(Object msgBody, String topicName, String event) { + protected void send(HCatEventMessage hCatEventMessage, String topicName) { try { - Destination topic = null; - if (null == session) { + if(null == session){ // this will happen, if we never able to establish a connection. createConnection(); - if (null == session) { + if (null == session){ // Still not successful, return from here. - LOG.error("Invalid session. Failed to send message on topic: " - + topicName + " event: " + event); + LOG.error("Invalid session. Failed to send message on topic: " + + topicName + " event: " + hCatEventMessage.getEventType()); return; } } - topic = getTopic(topicName); - if (null == topic) { + try{ + // Topics are created on demand. If it doesn't exist on broker it will + // be created when broker receives this message. + topic = session.createTopic(topicName); + } catch (IllegalStateException ise){ + // this will happen if we were able to establish connection once, but its no longer valid, + // ise is thrown, catch it and retry. + LOG.error("Seems like connection is lost. Retrying", ise); + createConnection(); + topic = session.createTopic(topicName); + } + if (null == topic){ // Still not successful, return from here. - LOG.error("Invalid session. Failed to send message on topic: " - + topicName + " event: " + event); + LOG.error("Invalid session. Failed to send message on topic: " + + topicName + " event: " + hCatEventMessage.getEventType()); return; } + MessageProducer producer = session.createProducer(topic); - Message msg; - if (msgBody instanceof Map) { - MapMessage mapMsg = session.createMapMessage(); - Map incomingMap = (Map) msgBody; - for (Entry partCol : incomingMap.entrySet()) { - mapMsg.setString(partCol.getKey(), partCol.getValue()); - } - msg = mapMsg; - } else { - msg = session.createObjectMessage((Serializable) msgBody); - } + Message msg = session.createTextMessage(hCatEventMessage.toString()); - msg.setStringProperty(HCatConstants.HCAT_EVENT, event); + msg.setStringProperty(HCatConstants.HCAT_EVENT, hCatEventMessage.getEventType().toString()); + msg.setStringProperty(HCatConstants.HCAT_MESSAGE_VERSION, messageFactory.getVersion()); + msg.setStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT, messageFactory.getMessageFormat()); producer.send(msg); // Message must be transacted before we return. session.commit(); - } catch (Exception e) { + } + catch(Exception e){ // Gobble up the exception. Message delivery is best effort. - LOG.error("Failed to send message on topic: " + topicName + " event: " - + event, e); + LOG.error("Failed to send message on topic: " + topicName + + " event: " + hCatEventMessage.getEventType(), e); } } @@ -383,12 +371,9 @@ @Override public void onLoadPartitionDone(LoadPartitionDoneEvent lpde) throws MetaException { - if (lpde.getStatus()) - send( - lpde.getPartitionName(), - lpde.getTable().getParameters() - .get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME), - HCatConstants.HCAT_PARTITION_DONE_EVENT); +// TODO: Fix LoadPartitionDoneEvent. Currently, LPDE can only carry a single partition-spec. And that defeats the purpose. +// if(lpde.getStatus()) +// send(lpde.getPartitionName(),lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),HCatConstants.HCAT_PARTITION_DONE_EVENT); } @Override Index: server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java (revision 0) +++ server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java (revision 0) @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.messaging; + +/** + * HCat message sent when a table is created in HCatalog. + */ +public abstract class CreateTableMessage extends HCatEventMessage { + + protected CreateTableMessage() { + super(EventType.CREATE_TABLE); + } + + /** + * Getter for the name of table created in HCatalog. + * @return Table-name (String). + */ + public abstract String getTable(); + + @Override + public HCatEventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + return super.checkValid(); + } +} Index: server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java (revision 0) +++ server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java (revision 0) @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.messaging; + +/** + * HCat message sent when a Table is dropped in HCatalog. + */ +public abstract class DropTableMessage extends HCatEventMessage { + + protected DropTableMessage() { + super(EventType.DROP_TABLE); + } + + /** + * Getter for the name of the table being dropped. + * @return Table-name (String). + */ + public abstract String getTable(); + + @Override + public HCatEventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + return super.checkValid(); + } +} Index: server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java (revision 0) +++ server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java (revision 0) @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.messaging; + +import java.util.List; +import java.util.Map; + +/** + * HCat message sent when a partition is dropped in HCatalog. + */ +public abstract class DropPartitionMessage extends HCatEventMessage { + + protected DropPartitionMessage() { + super(EventType.DROP_PARTITION); + } + + public abstract String getTable(); + public abstract List> getPartitions (); + + @Override + public HCatEventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + if (getPartitions() == null) + throw new IllegalStateException("Partition-list unset."); + return super.checkValid(); + } +} Index: server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java (revision 0) +++ server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java (revision 0) @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.messaging; + +import org.apache.hcatalog.common.HCatConstants; + +/** + * Class representing messages emitted when Metastore operations are done. + * (E.g. Creation and deletion of databases, tables and partitions.) + */ +public abstract class HCatEventMessage { + + /** + * Enumeration of all supported types of Metastore operations. + */ + public static enum EventType { + + CREATE_DATABASE(HCatConstants.HCAT_CREATE_DATABASE_EVENT), + DROP_DATABASE(HCatConstants.HCAT_DROP_DATABASE_EVENT), + CREATE_TABLE(HCatConstants.HCAT_CREATE_TABLE_EVENT), + DROP_TABLE(HCatConstants.HCAT_DROP_TABLE_EVENT), + ADD_PARTITION(HCatConstants.HCAT_ADD_PARTITION_EVENT), + DROP_PARTITION(HCatConstants.HCAT_DROP_PARTITION_EVENT); + + private String typeString; + + EventType(String typeString) { + this.typeString = typeString; + } + + @Override + public String toString() { return typeString; } + } + + protected EventType eventType; + + protected HCatEventMessage(EventType eventType) { + this.eventType = eventType; + } + + public EventType getEventType() { + return eventType; + } + + /** + * Getter for HCatalog Server's URL. + * (This is where the event originates from.) + * @return HCatalog Server's URL (String). + */ + public abstract String getServer(); + + /** + * Getter for the Kerberos principal of the HCatalog service. + * @return HCatalog Service Principal (String). + */ + public abstract String getServicePrincipal(); + + /** + * Getter for the name of the Database on which the Metastore operation is done. + * @return Database-name (String). + */ + public abstract String getDB(); + + /** + * Getter for the timestamp associated with the operation. + * @return Timestamp (Long - seconds since epoch). + */ + public abstract Long getTimestamp(); + + /** + * Class invariant. Checked after construction or deserialization. + */ + public HCatEventMessage checkValid() { + if (getServer() == null || getServicePrincipal() == null) + throw new IllegalStateException("Server-URL/Service-Principal shouldn't be null."); + if (getEventType() == null) + throw new IllegalStateException("Event-type unset."); + if (getDB() == null) + throw new IllegalArgumentException("DB-name unset."); + + return this; + } +} Index: server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java (revision 0) +++ server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java (revision 0) @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.messaging; + +import java.util.List; +import java.util.Map; + +/** + * The HCat message sent when partition(s) are added to a table. + */ +public abstract class AddPartitionMessage extends HCatEventMessage { + + protected AddPartitionMessage() { + super(EventType.ADD_PARTITION); + } + + /** + * Getter for name of table (where partitions are added). + * @return Table-name (String). + */ + public abstract String getTable(); + + /** + * Getter for list of partitions added. + * @return List of maps, where each map identifies values for each partition-key, for every added partition. + */ + public abstract List> getPartitions (); + + @Override + public HCatEventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + if (getPartitions() == null) + throw new IllegalStateException("Partition-list unset."); + return super.checkValid(); + } +} Index: server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java (revision 0) +++ server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java (revision 0) @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.messaging; + +/** + * Interface for converting HCat events from String-form back to HCatEventMessage instances. + */ +public abstract class MessageDeserializer { + + /** + * Method to construct HCatEventMessage from string. + */ + public HCatEventMessage getHCatEventMessage(String eventTypeString, String messageBody) { + + switch (HCatEventMessage.EventType.valueOf(eventTypeString)) { + case CREATE_DATABASE: + return getCreateDatabaseMessage(messageBody); + case DROP_DATABASE: + return getDropDatabaseMessage(messageBody); + case CREATE_TABLE: + return getCreateTableMessage(messageBody); + case DROP_TABLE: + return getDropTableMessage(messageBody); + case ADD_PARTITION: + return getAddPartitionMessage(messageBody); + case DROP_PARTITION: + return getDropPartitionMessage(messageBody); + + default: + throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString); + } + } + + /** + * Method to de-serialize CreateDatabaseMessage instance. + */ + public abstract CreateDatabaseMessage getCreateDatabaseMessage(String messageBody); + + /** + * Method to de-serialize DropDatabaseMessage instance. + */ + public abstract DropDatabaseMessage getDropDatabaseMessage(String messageBody); + + /** + * Method to de-serialize CreateTableMessage instance. + */ + public abstract CreateTableMessage getCreateTableMessage(String messageBody); + + /** + * Method to de-serialize DropTableMessage instance. + */ + public abstract DropTableMessage getDropTableMessage(String messageBody); + + /** + * Method to de-serialize AddPartitionMessage instance. + */ + public abstract AddPartitionMessage getAddPartitionMessage(String messageBody); + + /** + * Method to de-serialize DropPartitionMessage instance. + */ + public abstract DropPartitionMessage getDropPartitionMessage(String messageBody); + + // Protection against construction. + protected MessageDeserializer() {} +} Index: server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java (revision 0) +++ server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java (revision 0) @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.messaging.jms; + +import org.apache.commons.lang.StringUtils; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.messaging.HCatEventMessage; +import org.apache.hcatalog.messaging.MessageFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; + +/** + * Helper Utility to assist consumers of HCat Messages in extracting + * message-content from JMS messages. + */ +public class MessagingUtils { + + /** + * Method to return HCatEventMessage contained in the JMS message. + * @param message The JMS Message instance + * @return The contained HCatEventMessage + */ + public static HCatEventMessage getMessage(Message message) { + try { + String messageBody = ((TextMessage)message).getText(); + String eventType = message.getStringProperty(HCatConstants.HCAT_EVENT); + String messageVersion = message.getStringProperty(HCatConstants.HCAT_MESSAGE_VERSION); + String messageFormat = message.getStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT); + + if (StringUtils.isEmpty(messageBody) || StringUtils.isEmpty(eventType)) + throw new IllegalArgumentException("Could not extract HCatEventMessage. " + + "EventType and/or MessageBody is null/empty."); + + return MessageFactory.getDeserializer(messageFormat, messageVersion).getHCatEventMessage(eventType, messageBody); + } + catch (JMSException exception) { + throw new IllegalArgumentException("Could not extract HCatEventMessage. ", exception); + } + } + + // Prevent construction. + private MessagingUtils() {} +} Index: server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java (revision 0) +++ server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java (revision 0) @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.messaging.json; + +import org.apache.hcatalog.messaging.AddPartitionMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.List; +import java.util.Map; + +/** + * JSON implementation of AddPartitionMessage. + */ +public class JSONAddPartitionMessage extends AddPartitionMessage { + + @JsonProperty + String server, servicePrincipal, db, table; + + @JsonProperty + Long timestamp; + + @JsonProperty + List> partitions; + + /** + * Default Constructor. Required for Jackson. + */ + public JSONAddPartitionMessage() {} + + public JSONAddPartitionMessage(String server, String servicePrincipal, String db, String table, + List> partitions, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.partitions = partitions; + this.timestamp = timestamp; + checkValid(); + } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public String getDB() { return db; } + + @Override + public String getTable() { return table; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public List> getPartitions () { return partitions; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} Index: server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java (revision 0) +++ server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java (revision 0) @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.messaging.json; + +import org.apache.hcatalog.messaging.AddPartitionMessage; +import org.apache.hcatalog.messaging.CreateDatabaseMessage; +import org.apache.hcatalog.messaging.CreateTableMessage; +import org.apache.hcatalog.messaging.DropDatabaseMessage; +import org.apache.hcatalog.messaging.DropPartitionMessage; +import org.apache.hcatalog.messaging.DropTableMessage; +import org.apache.hcatalog.messaging.MessageDeserializer; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * MessageDeserializer implementation, for deserializing from JSON strings. + */ +public class JSONMessageDeserializer extends MessageDeserializer { + + static ObjectMapper mapper = new ObjectMapper(); // Thread-safe. + + static { + mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @Override + public CreateDatabaseMessage getCreateDatabaseMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONCreateDatabaseMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONCreateDatabaseMessage.", exception); + } + } + + @Override + public DropDatabaseMessage getDropDatabaseMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropDatabaseMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONDropDatabaseMessage.", exception); + } + } + + @Override + public CreateTableMessage getCreateTableMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONCreateTableMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONCreateTableMessage.", exception); + } + } + + @Override + public DropTableMessage getDropTableMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropTableMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONDropTableMessage.", exception); + } + } + + @Override + public AddPartitionMessage getAddPartitionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAddPartitionMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct AddPartitionMessage.", exception); + } + } + + @Override + public DropPartitionMessage getDropPartitionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropPartitionMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct DropPartitionMessage.", exception); + } + } +} Index: server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java (revision 0) +++ server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java (revision 0) @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hcatalog.messaging.AddPartitionMessage; +import org.apache.hcatalog.messaging.CreateDatabaseMessage; +import org.apache.hcatalog.messaging.CreateTableMessage; +import org.apache.hcatalog.messaging.DropDatabaseMessage; +import org.apache.hcatalog.messaging.DropPartitionMessage; +import org.apache.hcatalog.messaging.DropTableMessage; +import org.apache.hcatalog.messaging.MessageDeserializer; +import org.apache.hcatalog.messaging.MessageFactory; + +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * The JSON implementation of the MessageFactory. Constructs JSON implementations of + * each message-type. + */ +public class JSONMessageFactory extends MessageFactory { + + private static JSONMessageDeserializer deserializer = new JSONMessageDeserializer(); + + @Override + public MessageDeserializer getDeserializer() { + return deserializer; + } + + @Override + public String getVersion() { + return "0.1"; + } + + @Override + public String getMessageFormat() { + return "json"; + } + + @Override + public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) { + return new JSONCreateDatabaseMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db.getName(), + System.currentTimeMillis() / 1000); + } + + @Override + public DropDatabaseMessage buildDropDatabaseMessage(Database db) { + return new JSONDropDatabaseMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db.getName(), + System.currentTimeMillis() / 1000); + } + + @Override + public CreateTableMessage buildCreateTableMessage(Table table) { + return new JSONCreateTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), + table.getTableName(), System.currentTimeMillis()/1000); + } + + @Override + public DropTableMessage buildDropTableMessage(Table table) { + return new JSONDropTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), table.getTableName(), + System.currentTimeMillis()/1000); + } + + @Override + public AddPartitionMessage buildAddPartitionMessage(Table table, Partition partition) { + return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, partition.getDbName(), + partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)), + System.currentTimeMillis()/1000); + } + + @Override + public DropPartitionMessage buildDropPartitionMessage(Table table, Partition partition) { + return new JSONDropPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, partition.getDbName(), + partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)), + System.currentTimeMillis()/1000); + } + + private static Map getPartitionKeyValues(Table table, Partition partition) { + Map partitionKeys = new LinkedHashMap(); + for (int i=0; i> partitions; + + /** + * Default Constructor. Required for Jackson. + */ + public JSONDropPartitionMessage() {} + + public JSONDropPartitionMessage(String server, String servicePrincipal, String db, String table, + List> partitions, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.partitions = partitions; + this.timestamp = timestamp; + checkValid(); + } + + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public String getDB() { return db; } + + @Override + public String getTable() { return table; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public List> getPartitions () { return partitions; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} Index: server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java (revision 0) +++ server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java (revision 0) @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.messaging; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hcatalog.messaging.json.JSONMessageFactory; + +/** + * Abstract Factory for the construction of HCatalog message instances. + */ +public abstract class MessageFactory { + + private static MessageFactory instance = new JSONMessageFactory(); + + protected static final HiveConf hiveConf = new HiveConf(); + static { + hiveConf.addResource("hive-site.xml"); + } + + private static final String CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX = "hcatalog.message.factory.impl."; + private static final String CONF_LABEL_HCAT_MESSAGE_FORMAT = "hcatalog.message.format"; + private static final String HCAT_MESSAGE_FORMAT = hiveConf.get(CONF_LABEL_HCAT_MESSAGE_FORMAT, "json"); + private static final String DEFAULT_MESSAGE_FACTORY_IMPL = "org.apache.hcatalog.messaging.json.JSONMessageFactory"; + private static final String HCAT_MESSAGE_FACTORY_IMPL = hiveConf.get(CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX + + HCAT_MESSAGE_FORMAT, + DEFAULT_MESSAGE_FACTORY_IMPL); + + protected static final String HCAT_SERVER_URL = hiveConf.get(HiveConf.ConfVars.METASTOREURIS.name(), ""); + protected static final String HCAT_SERVICE_PRINCIPAL = hiveConf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.name(), ""); + + /** + * Getter for MessageFactory instance. + */ + public static MessageFactory getInstance() { + if (instance == null) { + instance = getInstance(HCAT_MESSAGE_FACTORY_IMPL); + } + return instance; + } + + private static MessageFactory getInstance(String className) { + try { + return (MessageFactory)ReflectionUtils.newInstance(Class.forName(className), hiveConf); + } + catch (ClassNotFoundException classNotFound) { + throw new IllegalStateException("Could not construct MessageFactory implementation: ", classNotFound); + } + } + + /** + * Getter for MessageDeserializer, corresponding to the specified format and version. + * @param format Serialization format for notifications. + * @param version Version of serialization format (currently ignored.) + * @return MessageDeserializer. + */ + public static MessageDeserializer getDeserializer(String format, + String version) { + return getInstance(hiveConf.get(CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX + format, + DEFAULT_MESSAGE_FACTORY_IMPL)).getDeserializer(); + } + + public abstract MessageDeserializer getDeserializer(); + + /** + * Getter for version-string, corresponding to all constructed messages. + */ + public abstract String getVersion(); + + /** + * Getter for message-format. + */ + public abstract String getMessageFormat(); + + /** + * Factory method for CreateDatabaseMessage. + * @param db The Database being added. + * @return CreateDatabaseMessage instance. + */ + public abstract CreateDatabaseMessage buildCreateDatabaseMessage(Database db); + + /** + * Factory method for DropDatabaseMessage. + * @param db The Database being dropped. + * @return DropDatabaseMessage instance. + */ + public abstract DropDatabaseMessage buildDropDatabaseMessage(Database db); + + /** + * Factory method for CreateTableMessage. + * @param table The Table being created. + * @return CreateTableMessage instance. + */ + public abstract CreateTableMessage buildCreateTableMessage(Table table); + + /** + * Factory method for DropTableMessage. + * @param table The Table being dropped. + * @return DropTableMessage instance. + */ + public abstract DropTableMessage buildDropTableMessage(Table table); + + /** + * Factory method for AddPartitionMessage. + * @param table The Table to which the partition is added. + * @param partition The Partition being added. + * @return AddPartitionMessage instance. + */ + public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Partition partition); + + /** + * Factory method for DropPartitionMessage. + * @param table The Table from which the partition is dropped. + * @param partition The Partition being dropped. + * @return DropPartitionMessage instance. + */ + public abstract DropPartitionMessage buildDropPartitionMessage(Table table, Partition partition); +} Index: server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java (revision 0) +++ server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java (revision 0) @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.messaging; + +/** + * HCat message sent when a Database is created in HCatalog. + */ +public abstract class CreateDatabaseMessage extends HCatEventMessage { + + protected CreateDatabaseMessage() { + super(EventType.CREATE_DATABASE); + } + +} Index: server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java =================================================================== --- server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java (revision 0) +++ server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java (revision 0) @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.messaging; + +/** + * HCat message sent when a Database is dropped from HCatalog. + */ +public abstract class DropDatabaseMessage extends HCatEventMessage { + + protected DropDatabaseMessage() { + super(EventType.DROP_DATABASE); + } +}