Index: ivy.xml
===================================================================
--- ivy.xml (revision 1127164)
+++ ivy.xml (working copy)
@@ -43,5 +43,8 @@
-->
-
+
+
+
+
Index: ivy/ivysettings.xml
===================================================================
--- ivy/ivysettings.xml (revision 1127164)
+++ ivy/ivysettings.xml (working copy)
@@ -36,10 +36,17 @@
+
+
+
Index: ivy/libraries.properties
===================================================================
--- ivy/libraries.properties (revision 1127164)
+++ ivy/libraries.properties (working copy)
@@ -18,4 +18,6 @@
pig.version=0.8.0
commons-cli.version=1.0
#hadoop-core.version=0.20.2 Waiting for a secure version of hadoop in maven
-
+jms.version=1.1
+activemq.version=5.5.0
+javax-mgmt.version=1.1-rev-1
Index: src/test/org/apache/hcatalog/listener/TestNotificationListener.java
===================================================================
--- src/test/org/apache/hcatalog/listener/TestNotificationListener.java (revision 0)
+++ src/test/org/apache/hcatalog/listener/TestNotificationListener.java (revision 0)
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.listener;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.thrift.TException;
+
+import junit.framework.TestCase;
+
+public class TestNotificationListener extends TestCase implements MessageListener{
+
+ private HiveConf hiveConf;
+ private Driver driver;
+ private AtomicInteger cntInvocation = new AtomicInteger(0);
+
+ @Override
+ protected void setUp() throws Exception {
+
+ super.setUp();
+ System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+ System.setProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false");
+ ConnectionFactory connFac = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ Connection conn = connFac.createConnection();
+ conn.start();
+ // We want message to be sent when session commits, thus we run in
+ // transacted mode.
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ Destination hcatTopic = session.createTopic(HCatConstants.HCAT_TOPIC);
+ MessageConsumer consumer1 = session.createConsumer(hcatTopic);
+ consumer1.setMessageListener(this);
+ Destination tblTopic = session.createTopic(HCatConstants.HCAT_TOPIC+".mydb.mytbl");
+ MessageConsumer consumer2 = session.createConsumer(tblTopic);
+ consumer2.setMessageListener(this);
+ Destination dbTopic = session.createTopic(HCatConstants.HCAT_TOPIC+".mydb");
+ MessageConsumer consumer3 = session.createConsumer(dbTopic);
+ consumer3.setMessageListener(this);
+ hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName());
+ hiveConf.set("hive.metastore.local", "true");
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ SessionState.start(new CliSessionState(hiveConf));
+ driver = new Driver(hiveConf);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ assertEquals(6, cntInvocation.get());
+ super.tearDown();
+ }
+
+ public void testAMQListener() throws MetaException, TException, UnknownTableException, NoSuchObjectException, CommandNeedRetryException{
+ driver.run("create database mydb");
+ driver.run("use mydb");
+ driver.run("create table mytbl (a string) partitioned by (b string)");
+ driver.run("alter table mytbl add partition(b='2011')");
+ driver.run("alter table mytbl drop partition(b='2011')");
+ driver.run("drop table mytbl");
+ driver.run("drop database mydb");
+ }
+
+ @Override
+ public void onMessage(Message msg) {
+ cntInvocation.incrementAndGet();
+
+ String event;
+ try {
+ event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
+ if(event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)){
+
+ assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+ assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName());
+ }
+ else if(event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)){
+
+ assertEquals("topic://HCAT.mydb",msg.getJMSDestination().toString());
+ Table tbl = (Table)(((ObjectMessage)msg).getObject());
+ assertEquals("mytbl", tbl.getTableName());
+ assertEquals("mydb", tbl.getDbName());
+ assertEquals(1, tbl.getPartitionKeysSize());
+ }
+ else if(event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)){
+
+ assertEquals("topic://HCAT.mydb.mytbl",msg.getJMSDestination().toString());
+ Partition part = (Partition)(((ObjectMessage)msg).getObject());
+ assertEquals("mytbl", part.getTableName());
+ assertEquals("mydb", part.getDbName());
+ List vals = new ArrayList(1);
+ vals.add("2011");
+ assertEquals(vals,part.getValues());
+ }
+ else if(event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)){
+
+ assertEquals("topic://HCAT.mydb.mytbl",msg.getJMSDestination().toString());
+ Partition part = (Partition)(((ObjectMessage)msg).getObject());
+ assertEquals("mytbl", part.getTableName());
+ assertEquals("mydb", part.getDbName());
+ List vals = new ArrayList(1);
+ vals.add("2011");
+ assertEquals(vals,part.getValues());
+ }
+ else if(event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)){
+
+ assertEquals("topic://HCAT.mydb",msg.getJMSDestination().toString());
+ Table tbl = (Table)(((ObjectMessage)msg).getObject());
+ assertEquals("mytbl", tbl.getTableName());
+ assertEquals("mydb", tbl.getDbName());
+ assertEquals(1, tbl.getPartitionKeysSize());
+ }
+ else if(event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)){
+
+ assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+ assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName());
+ }
+ else
+ assert false;
+ } catch (JMSException e) {
+ e.printStackTrace(System.err);
+ assert false;
+ }
+ }
+}
Index: src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java
===================================================================
--- src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java (revision 0)
+++ src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java (revision 0)
@@ -0,0 +1,66 @@
+package org.apache.hcatalog.metadata;
+
+public class HPartitionDoneEvent {
+
+ private String dbName;
+
+ private String tblName;
+
+ private String partVals;
+
+ private Long createTime;
+
+ /**
+ * @return the createTime
+ */
+ public Long getCreateTime() {
+ return createTime;
+ }
+
+ HPartitionDoneEvent(String dbName, String tblName, String partSpec) {
+ super();
+ this.dbName = dbName;
+ this.tblName = tblName;
+ this.partVals = partSpec;
+ this.createTime = System.currentTimeMillis();
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return "HPartitionDoneEvent [dbName=" + dbName + ", tblName=" + tblName
+ + ", partSpec=" + partVals + ", createTime=" + createTime + "]";
+ }
+
+ public HPartitionDoneEvent() {}
+
+ /**
+ * @param dbName the dbName to set
+ */
+ public void setDbName(String dbName) {
+ this.dbName = dbName;
+ }
+
+ /**
+ * @param tblName the tblName to set
+ */
+ public void setTblName(String tblName) {
+ this.tblName = tblName;
+ }
+
+ /**
+ * @param partSpec the partSpec to set
+ */
+ public void setPartSpec(String partSpec) {
+ this.partVals = partSpec;
+ }
+
+ /**
+ * @param createTime the createTime to set
+ */
+ public void setCreateTime(Long createTime) {
+ this.createTime = createTime;
+ }
+}
Index: src/java/org/apache/hcatalog/metadata/ormodel.jdo
===================================================================
--- src/java/org/apache/hcatalog/metadata/ormodel.jdo (revision 0)
+++ src/java/org/apache/hcatalog/metadata/ormodel.jdo (revision 0)
@@ -0,0 +1,22 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+