Index: ivy.xml =================================================================== --- ivy.xml (revision 1127164) +++ ivy.xml (working copy) @@ -43,5 +43,8 @@ --> - + + + + Index: ivy/ivysettings.xml =================================================================== --- ivy/ivysettings.xml (revision 1127164) +++ ivy/ivysettings.xml (working copy) @@ -36,10 +36,17 @@ + + + Index: ivy/libraries.properties =================================================================== --- ivy/libraries.properties (revision 1127164) +++ ivy/libraries.properties (working copy) @@ -18,4 +18,6 @@ pig.version=0.8.0 commons-cli.version=1.0 #hadoop-core.version=0.20.2 Waiting for a secure version of hadoop in maven - +jms.version=1.1 +activemq.version=5.5.0 +javax-mgmt.version=1.1-rev-1 Index: src/test/org/apache/hcatalog/listener/TestNotificationListener.java =================================================================== --- src/test/org/apache/hcatalog/listener/TestNotificationListener.java (revision 0) +++ src/test/org/apache/hcatalog/listener/TestNotificationListener.java (revision 0) @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.listener; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.thrift.TException; + +import junit.framework.TestCase; + +public class TestNotificationListener extends TestCase implements MessageListener{ + + private HiveConf hiveConf; + private Driver driver; + private AtomicInteger cntInvocation = new AtomicInteger(0); + + @Override + protected void setUp() throws Exception { + + super.setUp(); + System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false"); + ConnectionFactory connFac = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + Connection conn = connFac.createConnection(); + conn.start(); + // We want message to be sent when session commits, thus we run in + // transacted mode. + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Destination hcatTopic = session.createTopic(HCatConstants.HCAT_TOPIC); + MessageConsumer consumer1 = session.createConsumer(hcatTopic); + consumer1.setMessageListener(this); + Destination tblTopic = session.createTopic(HCatConstants.HCAT_TOPIC+".mydb.mytbl"); + MessageConsumer consumer2 = session.createConsumer(tblTopic); + consumer2.setMessageListener(this); + Destination dbTopic = session.createTopic(HCatConstants.HCAT_TOPIC+".mydb"); + MessageConsumer consumer3 = session.createConsumer(dbTopic); + consumer3.setMessageListener(this); + hiveConf = new HiveConf(this.getClass()); + hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName()); + hiveConf.set("hive.metastore.local", "true"); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + SessionState.start(new CliSessionState(hiveConf)); + driver = new Driver(hiveConf); + } + + @Override + protected void tearDown() throws Exception { + assertEquals(6, cntInvocation.get()); + super.tearDown(); + } + + public void testAMQListener() throws MetaException, TException, UnknownTableException, NoSuchObjectException, CommandNeedRetryException{ + driver.run("create database mydb"); + driver.run("use mydb"); + driver.run("create table mytbl (a string) partitioned by (b string)"); + driver.run("alter table mytbl add partition(b='2011')"); + driver.run("alter table mytbl drop partition(b='2011')"); + driver.run("drop table mytbl"); + driver.run("drop database mydb"); + } + + @Override + public void onMessage(Message msg) { + cntInvocation.incrementAndGet(); + + String event; + try { + event = msg.getStringProperty(HCatConstants.HCAT_EVENT); + if(event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)){ + + assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); + assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName()); + } + else if(event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)){ + + assertEquals("topic://HCAT.mydb",msg.getJMSDestination().toString()); + Table tbl = (Table)(((ObjectMessage)msg).getObject()); + assertEquals("mytbl", tbl.getTableName()); + assertEquals("mydb", tbl.getDbName()); + assertEquals(1, tbl.getPartitionKeysSize()); + } + else if(event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)){ + + assertEquals("topic://HCAT.mydb.mytbl",msg.getJMSDestination().toString()); + Partition part = (Partition)(((ObjectMessage)msg).getObject()); + assertEquals("mytbl", part.getTableName()); + assertEquals("mydb", part.getDbName()); + List vals = new ArrayList(1); + vals.add("2011"); + assertEquals(vals,part.getValues()); + } + else if(event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)){ + + assertEquals("topic://HCAT.mydb.mytbl",msg.getJMSDestination().toString()); + Partition part = (Partition)(((ObjectMessage)msg).getObject()); + assertEquals("mytbl", part.getTableName()); + assertEquals("mydb", part.getDbName()); + List vals = new ArrayList(1); + vals.add("2011"); + assertEquals(vals,part.getValues()); + } + else if(event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)){ + + assertEquals("topic://HCAT.mydb",msg.getJMSDestination().toString()); + Table tbl = (Table)(((ObjectMessage)msg).getObject()); + assertEquals("mytbl", tbl.getTableName()); + assertEquals("mydb", tbl.getDbName()); + assertEquals(1, tbl.getPartitionKeysSize()); + } + else if(event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)){ + + assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); + assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName()); + } + else + assert false; + } catch (JMSException e) { + e.printStackTrace(System.err); + assert false; + } + } +} Index: src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java =================================================================== --- src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java (revision 0) +++ src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java (revision 0) @@ -0,0 +1,66 @@ +package org.apache.hcatalog.metadata; + +public class HPartitionDoneEvent { + + private String dbName; + + private String tblName; + + private String partVals; + + private Long createTime; + + /** + * @return the createTime + */ + public Long getCreateTime() { + return createTime; + } + + HPartitionDoneEvent(String dbName, String tblName, String partSpec) { + super(); + this.dbName = dbName; + this.tblName = tblName; + this.partVals = partSpec; + this.createTime = System.currentTimeMillis(); + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return "HPartitionDoneEvent [dbName=" + dbName + ", tblName=" + tblName + + ", partSpec=" + partVals + ", createTime=" + createTime + "]"; + } + + public HPartitionDoneEvent() {} + + /** + * @param dbName the dbName to set + */ + public void setDbName(String dbName) { + this.dbName = dbName; + } + + /** + * @param tblName the tblName to set + */ + public void setTblName(String tblName) { + this.tblName = tblName; + } + + /** + * @param partSpec the partSpec to set + */ + public void setPartSpec(String partSpec) { + this.partVals = partSpec; + } + + /** + * @param createTime the createTime to set + */ + public void setCreateTime(Long createTime) { + this.createTime = createTime; + } +} Index: src/java/org/apache/hcatalog/metadata/ormodel.jdo =================================================================== --- src/java/org/apache/hcatalog/metadata/ormodel.jdo (revision 0) +++ src/java/org/apache/hcatalog/metadata/ormodel.jdo (revision 0) @@ -0,0 +1,22 @@ + + + + + + + + + + + + + + + + + + + + +