Index: src/test/org/apache/hcatalog/listener/TestNotificationListener.java =================================================================== --- src/test/org/apache/hcatalog/listener/TestNotificationListener.java (revision 1136173) +++ src/test/org/apache/hcatalog/listener/TestNotificationListener.java (working copy) @@ -19,13 +19,16 @@ package org.apache.hcatalog.listener; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -36,11 +39,16 @@ import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; @@ -89,15 +97,20 @@ @Override protected void tearDown() throws Exception { - assertEquals(6, cntInvocation.get()); + assertEquals(7, cntInvocation.get()); super.tearDown(); } - public void testAMQListener() throws MetaException, TException, UnknownTableException, NoSuchObjectException, CommandNeedRetryException{ + public void testAMQListener() throws MetaException, TException, UnknownTableException, NoSuchObjectException, + CommandNeedRetryException, UnknownDBException, InvalidPartitionException, UnknownPartitionException{ driver.run("create database mydb"); driver.run("use mydb"); driver.run("create table mytbl (a string) partitioned by (b string)"); driver.run("alter table mytbl add partition(b='2011')"); + HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf); + Map kvs = new HashMap(1); + kvs.put("b", "2011"); + msc.markPartitionForEvent("mydb", "mytbl", kvs, PartitionEventType.LOAD_DONE); driver.run("alter table mytbl drop partition(b='2011')"); driver.run("drop table mytbl"); driver.run("drop database mydb"); @@ -156,7 +169,11 @@ assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName()); } - else + else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) { + assertEquals("topic://HCAT.mydb.mytbl",msg.getJMSDestination().toString()); + MapMessage mapMsg = (MapMessage)msg; + assert mapMsg.getString("b").equals("2011"); + } else assert false; } catch (JMSException e) { e.printStackTrace(System.err); Index: src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java =================================================================== --- src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java (revision 1136173) +++ src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java (working copy) @@ -1,66 +0,0 @@ -package org.apache.hcatalog.metadata; - -public class HPartitionDoneEvent { - - private String dbName; - - private String tblName; - - private String partVals; - - private Long createTime; - - /** - * @return the createTime - */ - public Long getCreateTime() { - return createTime; - } - - HPartitionDoneEvent(String dbName, String tblName, String partSpec) { - super(); - this.dbName = dbName; - this.tblName = tblName; - this.partVals = partSpec; - this.createTime = System.currentTimeMillis(); - } - - /* (non-Javadoc) - * @see java.lang.Object#toString() - */ - @Override - public String toString() { - return "HPartitionDoneEvent [dbName=" + dbName + ", tblName=" + tblName - + ", partSpec=" + partVals + ", createTime=" + createTime + "]"; - } - - public HPartitionDoneEvent() {} - - /** - * @param dbName the dbName to set - */ - public void setDbName(String dbName) { - this.dbName = dbName; - } - - /** - * @param tblName the tblName to set - */ - public void setTblName(String tblName) { - this.tblName = tblName; - } - - /** - * @param partSpec the partSpec to set - */ - public void setPartSpec(String partSpec) { - this.partVals = partSpec; - } - - /** - * @param createTime the createTime to set - */ - public void setCreateTime(Long createTime) { - this.createTime = createTime; - } -} Index: src/java/org/apache/hcatalog/metadata/ormodel.jdo =================================================================== --- src/java/org/apache/hcatalog/metadata/ormodel.jdo (revision 1136173) +++ src/java/org/apache/hcatalog/metadata/ormodel.jdo (working copy) @@ -1,22 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - Index: src/java/org/apache/hcatalog/listener/NotificationListener.java =================================================================== --- src/java/org/apache/hcatalog/listener/NotificationListener.java (revision 1136173) +++ src/java/org/apache/hcatalog/listener/NotificationListener.java (working copy) @@ -21,6 +21,8 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -28,6 +30,8 @@ import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; @@ -53,6 +57,7 @@ import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.apache.hcatalog.common.HCatConstants; /** @@ -201,7 +206,7 @@ * @param event is the value of HCAT_EVENT property in message. It can be * used to select messages in client side. */ - private void send(Serializable msgBody, String topicName, String event){ + private void send(Object msgBody, String topicName, String event){ try{ @@ -234,7 +239,19 @@ return; } MessageProducer producer = session.createProducer(topic); - ObjectMessage msg = session.createObjectMessage(msgBody); + Message msg; + if (msgBody instanceof Map){ + MapMessage mapMsg = session.createMapMessage(); + Map incomingMap = (Map)msgBody; + for (Entry partCol : incomingMap.entrySet()){ + mapMsg.setString(partCol.getKey(), partCol.getValue()); + } + msg = mapMsg; + } + else { + msg = session.createObjectMessage((Serializable)msgBody); + } + msg.setStringProperty(HCatConstants.HCAT_EVENT, event); producer.send(msg); // Message must be transacted before we return. @@ -288,4 +305,11 @@ LOG.info("Failed to close message bus connection.", ignore); } } + + @Override + public void onLoadPartitionDone(LoadPartitionDoneEvent lpde) + throws MetaException { + if(lpde.getStatus()) + send(lpde.getPartitionName(),lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),HCatConstants.HCAT_PARTITION_DONE_EVENT); + } } Index: src/java/org/apache/hcatalog/common/HCatConstants.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatConstants.java (revision 1136173) +++ src/java/org/apache/hcatalog/common/HCatConstants.java (working copy) @@ -76,6 +76,7 @@ public static final String HCAT_EVENT = "HCAT_EVENT"; public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION"; public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION"; + public static final String HCAT_PARTITION_DONE_EVENT = "HCAT_PARTITION_DONE"; public static final String HCAT_ADD_TABLE_EVENT = "HCAT_ADD_TABLE"; public static final String HCAT_DROP_TABLE_EVENT = "HCAT_DROP_TABLE"; public static final String HCAT_ADD_DATABASE_EVENT = "HCAT_ADD_DATABASE"; Index: build.xml =================================================================== --- build.xml (revision 1136173) +++ build.xml (working copy) @@ -231,7 +231,6 @@ - @@ -259,32 +258,6 @@ --> - - - - - - - - - - - - - - - - - - - -