Index: ivy.xml =================================================================== --- ivy.xml (revision 1097660) +++ ivy.xml (working copy) @@ -43,5 +43,8 @@ --> - + + + + Index: ivy/ivysettings.xml =================================================================== --- ivy/ivysettings.xml (revision 1097660) +++ ivy/ivysettings.xml (working copy) @@ -36,10 +36,17 @@ + + + Index: ivy/libraries.properties =================================================================== --- ivy/libraries.properties (revision 1097660) +++ ivy/libraries.properties (working copy) @@ -18,4 +18,6 @@ pig.version=0.8.0 commons-cli.version=1.0 #hadoop-core.version=0.20.2 Waiting for a secure version of hadoop in maven - +jms.version=1.1 +activemq.version=5.4.2 +javax-mgmt.version=1.1-rev-1 Index: src/test/org/apache/hcatalog/listener/TestHCatListener.java =================================================================== --- src/test/org/apache/hcatalog/listener/TestHCatListener.java (revision 0) +++ src/test/org/apache/hcatalog/listener/TestHCatListener.java (revision 0) @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.listener; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.HiveObjectRef; +import org.apache.hadoop.hive.metastore.api.HiveObjectType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.thrift.TException; + +import junit.framework.TestCase; + +public class TestHCatListener extends TestCase implements MessageListener{ + + private static final String msPort = "20201"; + private HiveConf hiveConf; + private HiveMetaStoreClient msc; + private Driver driver; + private AtomicInteger cntInvocation = new AtomicInteger(0); + + private static class RunMS implements Runnable { + + @Override + public void run() { + HiveMetaStore.main(new String[]{msPort}); + } + + } + + @Override + protected void setUp() throws Exception { + + super.setUp(); + System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false"); + System.setProperty("hive.metastore.event.listeners",HCatListener.class.getName()); + ConnectionFactory connFac = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + Connection conn = connFac.createConnection(); + conn.start(); + // We want message to be sent when session commits, thus we run in + // transacted mode. + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Destination hcatTopic = session.createTopic(HCatConstants.HCAT_TOPIC); + MessageConsumer consumer1 = session.createConsumer(hcatTopic); + consumer1.setMessageListener(this); + Destination tblTopic = session.createTopic("mydb.mytbl"); + MessageConsumer consumer2 = session.createConsumer(tblTopic); + consumer2.setMessageListener(this); + Thread t = new Thread(new RunMS()); + t.start(); + Thread.sleep(20000); + hiveConf = new HiveConf(this.getClass()); + hiveConf.set("hive.metastore.local", "false"); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort); + hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + SessionState.start(new CliSessionState(hiveConf)); + driver = new Driver(hiveConf); + msc = new HiveMetaStoreClient(hiveConf); + } + + @Override + protected void tearDown() throws Exception { + assertEquals(7, cntInvocation.get()); + super.tearDown(); + } + + public void testAMQListener() throws MetaException, TException, UnknownTableException, NoSuchObjectException, CommandNeedRetryException{ + driver.run("create database mydb"); + driver.run("use mydb"); + driver.run("create table mytbl (a string) partitioned by (b string)"); + driver.run("alter table mytbl add partition(b='2011')"); + List partSpec = new ArrayList(1); + partSpec.add("b='2011'"); + HiveObjectRef obj = new HiveObjectRef(HiveObjectType.PARTITION,"mydb","mytbl",partSpec,null); + msc.sendMessage(obj, 0); + assertEquals(Boolean.toString(true), msc.recvMessage(obj, 0)); + driver.run("alter table mytbl drop partition(b='2011')"); + driver.run("drop table mytbl"); + driver.run("drop database mydb"); + } + + @Override + public void onMessage(Message msg) { + cntInvocation.incrementAndGet(); + String event; + try { + event = msg.getStringProperty(HCatConstants.HCAT_EVENT); + + if(event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)){ + + assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); + assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName()); + } + else if(event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)){ + + assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); + Table tbl = (Table)(((ObjectMessage)msg).getObject()); + assertEquals("mytbl", tbl.getTableName()); + assertEquals("mydb", tbl.getDbName()); + assertEquals(1, tbl.getPartitionKeysSize()); + } + else if(event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)){ + + assertEquals("topic://mydb.mytbl",msg.getJMSDestination().toString()); + Partition part = (Partition)(((ObjectMessage)msg).getObject()); + assertEquals("mytbl", part.getTableName()); + assertEquals("mydb", part.getDbName()); + List vals = new ArrayList(1); + vals.add("2011"); + assertEquals("topic://mydb.mytbl",msg.getJMSDestination().toString()); + assertEquals(vals,part.getValues()); + } + else if(event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)){ + + assertEquals("topic://mydb.mytbl",msg.getJMSDestination().toString()); + Partition part = (Partition)(((ObjectMessage)msg).getObject()); + assertEquals("mytbl", part.getTableName()); + assertEquals("mydb", part.getDbName()); + List vals = new ArrayList(1); + vals.add("2011"); + assertEquals("topic://mydb.mytbl",msg.getJMSDestination().toString()); + assertEquals(vals,part.getValues()); + } + else if(event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)){ + + assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); + Table tbl = (Table)(((ObjectMessage)msg).getObject()); + assertEquals("mytbl", tbl.getTableName()); + assertEquals("mydb", tbl.getDbName()); + assertEquals(1, tbl.getPartitionKeysSize()); + } + else if(event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)){ + + assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); + assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName()); + } + else if(event.equals(HCatConstants.HCAT_FINALIZE_PARTITON)){ + assertEquals("topic://mydb.mytbl",msg.getJMSDestination().toString()); + assertEquals("b='2011'", ((ObjectMessage)msg).getObject()); + } + else + assert false; + } catch (JMSException e) { + e.printStackTrace(System.err); + assert false; + throw new RuntimeException(e); + } + } +} Index: src/java/org/apache/hcatalog/listener/HCatListener.java =================================================================== --- src/java/org/apache/hcatalog/listener/HCatListener.java (revision 0) +++ src/java/org/apache/hcatalog/listener/HCatListener.java (revision 0) @@ -0,0 +1,305 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.listener; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; + +import javax.jdo.PersistenceManager; +import javax.jdo.Query; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.MetaStoreEventListener; +import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.RawStore; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler.Command; +import org.apache.hadoop.hive.metastore.api.HiveObjectRef; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.MessageEvent; +import org.apache.hcatalog.common.HCatConstants; + +/** + * Implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} + * It sends message on two type of topics. One has name of form dbName.tblName + * On this topic, two kind of messages are sent: add/drop partition and + * finalize_partition message. + * Second topic has name "HCAT" and messages sent on it are: add/drop database + * and add/drop table. + * All messages also has a property named "HCAT_EVENT" set on them whose value + * can be used to configure message selector on subscriber side. + */ +public class HCatListener extends MetaStoreEventListener{ + + private static final Log LOG = LogFactory.getLog(HCatListener.class); + private Session session; + private Connection conn; + + /** + * Create message bus connection and session in constructor. + */ + public HCatListener(final Configuration conf) { + + super(conf); + try { + Context jndiCntxt = new InitialContext(); + ConnectionFactory connFac = (ConnectionFactory)jndiCntxt.lookup("ConnectionFactory"); + conn = connFac.createConnection(); + conn.start(); + // We want message to be sent when session commits, thus we run in + // transacted mode. + session = conn.createSession(true, Session.SESSION_TRANSACTED); + } catch (NamingException e) { + LOG.error("JNDI error while setting up Message Bus connection. " + + "Please make sure file named 'jndi.properties' is in " + + "classpath and contains appropriate key-value pairs.",e); + } + catch (JMSException e) { + LOG.error("Failed to initialize connection to message bus",e); + } + catch(Throwable t){ + LOG.error("HCAT Listener failed to load",t); + } + } + + @Override + public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException { + // Subscriber can get notification of newly add partition in a + // particular table by listening on a topic named "dbName.tableName" + // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" + if(partitionEvent.getStatus()){ + Partition partition = partitionEvent.getPartition(); + send(partition, partition.getDbName()+"."+partition.getTableName(), + HCatConstants.HCAT_ADD_PARTITION_EVENT); + } + + } + + @Override + public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { + // Subscriber can get notification of dropped partition in a + // particular table by listening on a topic named "dbName.tableName" + // and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION" + + // Datanucleus throws NPE when we try to serialize a partition object + // retrieved from metastore. To workaround that we reset following objects + + if(partitionEvent.getStatus()){ + Partition partition = partitionEvent.getPartition(); + StorageDescriptor sd = partition.getSd(); + sd.setBucketCols(new ArrayList()); + sd.setSortCols(new ArrayList()); + sd.setParameters(new HashMap()); + sd.getSerdeInfo().setParameters(new HashMap()); + send(partition, partition.getDbName()+"."+partition.getTableName(), + HCatConstants.HCAT_DROP_PARTITION_EVENT); + } + } + + @Override + public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException { + // Subscriber can get notification about addition of a database in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_ADD_DATABASE" + if(dbEvent.getStatus()) + send(dbEvent.getDatabase(),HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_ADD_DATABASE_EVENT); + } + + @Override + public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { + // Subscriber can get notification about drop of a database in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_DROP_DATABASE" + if(dbEvent.getStatus()) + send(dbEvent.getDatabase(),HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_DROP_DATABASE_EVENT); + } + + @Override + public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { + // Subscriber can get notification about addition of a table in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_ADD_TABLE" + if(tableEvent.getStatus()) + send(tableEvent.getTable(),HCatConstants.HCAT_TOPIC, HCatConstants.HCAT_ADD_TABLE_EVENT); + } + + @Override + public void onDropTable(DropTableEvent tableEvent) throws MetaException { + // Subscriber can get notification about drop of a table in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_DROP_TABLE" + + // Datanucleus throws NPE when we try to serialize a table object + // retrieved from metastore. To workaround that we reset following objects + + if(tableEvent.getStatus()){ + Table table = tableEvent.getTable(); + StorageDescriptor sd = table.getSd(); + sd.setBucketCols(new ArrayList()); + sd.setSortCols(new ArrayList()); + sd.setParameters(new HashMap()); + sd.getSerdeInfo().setParameters(new HashMap()); + send(table,HCatConstants.HCAT_TOPIC, HCatConstants.HCAT_DROP_TABLE_EVENT); + } + } + + /** + * @param msgBody is the metastore object. It is sent in full such that + * if subscriber is really interested in details, it can reconstruct it fully. + * In case of finalize_partition message this will be string specification of + * the partition. + * @param topicName is the name on message broker on which message is sent. + * @param event is the value of HCAT_EVENT property in message. It can be + * used to select messages in client side. + */ + private void send(Serializable msgBody, String topicName, String event){ + + if(null == session){ + // If we weren't able to setup the session in the constructor + // we cant send message in any case. + LOG.error("Invalid session. Failed to send message on topic: "+ + topicName + " event: "+event); + return; + } + + try{ + // Topics are created on demand. If it doesn't exist on broker it will + // be created when broker receives this message. + Destination topic = session.createTopic(topicName); + MessageProducer producer = session.createProducer(topic); + ObjectMessage msg = session.createObjectMessage(msgBody); + msg.setStringProperty(HCatConstants.HCAT_EVENT, event); + producer.send(msg); + // Message must be transacted before we return. + session.commit(); + } catch(Exception e){ + // Gobble up the exception. Message delivery is best effort. + LOG.error("Failed to send message on topic: "+topicName + + " event: "+event , e); + } + } + + @Override + protected void finalize() throws Throwable { + // Close the connection before dying. + try { + if(conn != null) { + conn.close(); + } + } catch (Exception ignore) { + LOG.info("Failed to close message bus connection.", ignore); + } + } + + @Override + public boolean canProcessRecvMessage(int msgType) { + return true; + } + + @Override + public boolean canProcessSendMessage(int msgType) { + return true; + } + + @Override + public String processRecvMessage(MessageEvent msg) throws MetaException { + final HiveObjectRef obj = msg.getHiveObj(); + String retVal = Boolean.toString(false); + try { + retVal = msg.getHandler().executeWithRetry(new Command() { + @Override + public String run(RawStore ms) throws Exception { + MPartitionDoneEvent mpart = null; + Boolean commited = false; + try { + ms.openTransaction(); + String dbName = obj.getDbName().toLowerCase().trim(); + String tableName = obj.getObjectName().toLowerCase().trim(); + + // Change the query to use part_vals instead of the name which is + // redundant + PersistenceManager pm = ((ObjectStore)ms).getPersistenceManager(); + Query query = pm.newQuery(MPartitionDoneEvent.class, + "dbName == t1 && tblName == t2 && partSpec == t3"); + query.declareParameters("java.lang.String t1, java.lang.String t2, java.lang.String t3"); + query.setUnique(true); + mpart = (MPartitionDoneEvent) query.execute(tableName, dbName, StringUtils.join(obj.getPartValues(),',')); + pm.retrieve(mpart); + commited = ms.commitTransaction(); + } finally { + if (!commited) { + ms.rollbackTransaction(); + } + } + return commited.toString(); + } + }); + return retVal; + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + return retVal; + } + } + + @Override + public void processSendMessage(MessageEvent msg) throws MetaException { + final HiveObjectRef obj = msg.getHiveObj(); + try { + msg.getHandler().executeWithRetry(new Command() { + @Override + public Boolean run(RawStore ms) throws Exception { + PersistenceManager pm = ((ObjectStore)ms).getPersistenceManager(); + ms.openTransaction(); + pm.makePersistent(new MPartitionDoneEvent( + obj.getDbName(),obj.getObjectName(),StringUtils.join(obj.getPartValues(),','))); + ms.commitTransaction(); + return Boolean.TRUE; + } + }); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + send(StringUtils.join(obj.getPartValues(),','), obj.getDbName()+"."+obj.getObjectName(), HCatConstants.HCAT_FINALIZE_PARTITON); + } +} Index: src/java/org/apache/hcatalog/listener/MPartitionDoneEvent.java =================================================================== --- src/java/org/apache/hcatalog/listener/MPartitionDoneEvent.java (revision 0) +++ src/java/org/apache/hcatalog/listener/MPartitionDoneEvent.java (revision 0) @@ -0,0 +1,47 @@ +package org.apache.hcatalog.listener; + +import javax.jdo.annotations.PersistenceCapable; +import javax.jdo.annotations.Persistent; + +@PersistenceCapable +public class MPartitionDoneEvent { + + @Persistent + private String dbName; + + @Persistent + private String tblName; + + @Persistent + private String partSpec; + + MPartitionDoneEvent(String dbName, String tblName, String partSpec) { + super(); + this.dbName = dbName; + this.tblName = tblName; + this.partSpec = partSpec; + } + + public MPartitionDoneEvent() {} + + /** + * @param dbName the dbName to set + */ + public void setDbName(String dbName) { + this.dbName = dbName; + } + + /** + * @param tblName the tblName to set + */ + public void setTblName(String tblName) { + this.tblName = tblName; + } + + /** + * @param partSpec the partSpec to set + */ + public void setPartSpec(String partSpec) { + this.partSpec = partSpec; + } +} Index: src/java/org/apache/hcatalog/common/HCatConstants.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatConstants.java (revision 1097660) +++ src/java/org/apache/hcatalog/common/HCatConstants.java (working copy) @@ -64,4 +64,15 @@ public static final String HCAT_KEY_OUTPUT_INFO = HCAT_KEY_OUTPUT_BASE + ".info"; public static final String HCAT_KEY_HIVE_CONF = HCAT_KEY_OUTPUT_BASE + ".hive.conf"; public static final String HCAT_KEY_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".token.sig"; + + public static final String HCAT_TOPIC = "HCAT"; + public static final String HCAT_EVENT = "HCAT_EVENT"; + public static final String HCAT_FINALIZE_PARTITON = "HCAT_FINALIZE_PARTITON"; + + public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION"; + public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION"; + public static final String HCAT_ADD_TABLE_EVENT = "HCAT_ADD_TABLE"; + public static final String HCAT_DROP_TABLE_EVENT = "HCAT_DROP_TABLE"; + public static final String HCAT_ADD_DATABASE_EVENT = "HCAT_ADD_DATABASE"; + public static final String HCAT_DROP_DATABASE_EVENT = "HCAT_DROP_DATABASE"; } Index: build.xml =================================================================== --- build.xml (revision 1097660) +++ build.xml (working copy) @@ -92,15 +92,20 @@ - + + - + + - + + + + @@ -217,6 +222,7 @@ + @@ -230,6 +236,31 @@ + + + + + + + + + + + + + + + + + + +