Index: src/test/org/apache/hcatalog/listener/TestCleanserThread.java =================================================================== --- src/test/org/apache/hcatalog/listener/TestCleanserThread.java (revision 0) +++ src/test/org/apache/hcatalog/listener/TestCleanserThread.java (revision 0) @@ -0,0 +1,53 @@ +package org.apache.hcatalog.listener; + +import java.util.Arrays; +import java.util.Collections; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.HiveObjectRef; +import org.apache.hadoop.hive.metastore.api.HiveObjectType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.thrift.TException; + +import junit.framework.TestCase; + +public class TestCleanserThread extends TestCase { + + private HiveMetaStoreClient client; + + protected void setUp() throws Exception { + super.setUp(); + + HiveConf conf = new HiveConf(getClass()); + conf.setLong(HCatConstants.HCAT_MSG_CLEAN_FREQ, 10); + conf.setLong(HCatConstants.HCAT_MSG_EXPIRY_DURATION, 1); + conf.setVar(ConfVars.METASTORE_EVENT_LISTENERS, NotificationListener.class.getName()); + System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false"); + ConnectionFactory connFac = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + Connection conn = connFac.createConnection(); + conn.start(); + conf.setBoolean("hive.metastore.local", true); + client = new HiveMetaStoreClient(conf); + } + + protected void tearDown() throws Exception { + super.tearDown(); + } + + public void testCleanserThread() throws MetaException, TException, InterruptedException{ + + HiveObjectRef hiveObject = new HiveObjectRef(HiveObjectType.GLOBAL, "tmpdb", "tmptbl", Arrays.asList(new String[]{"k1=v1"}), "colname"); + client.sendMessage(new HiveObjectRef(HiveObjectType.GLOBAL, "tmpdb", "tmptbl", Arrays.asList(new String[]{"k1=v1"}), "colname"), 0); + assertEquals("true",client.recvMessage(hiveObject, 0)); + Thread.sleep(10000); + assertEquals("false",client.recvMessage(new HiveObjectRef(HiveObjectType.GLOBAL, "tmpdb", "tmptbl", Arrays.asList(new String[]{"k1=v1"}), "colname"), 0)); + } +} Index: src/test/org/apache/hcatalog/listener/TestNotificationListener.java =================================================================== --- src/test/org/apache/hcatalog/listener/TestNotificationListener.java (revision 1130631) +++ src/test/org/apache/hcatalog/listener/TestNotificationListener.java (working copy) @@ -35,8 +35,10 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.HiveObjectRef; +import org.apache.hadoop.hive.metastore.api.HiveObjectType; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; @@ -53,6 +55,7 @@ public class TestNotificationListener extends TestCase implements MessageListener{ private HiveConf hiveConf; + private HiveMetaStoreClient msc; private Driver driver; private AtomicInteger cntInvocation = new AtomicInteger(0); @@ -62,6 +65,7 @@ super.setUp(); System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); System.setProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false"); + System.setProperty("hive.metastore.event.listeners",NotificationListener.class.getName()); ConnectionFactory connFac = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); Connection conn = connFac.createConnection(); conn.start(); @@ -78,18 +82,18 @@ MessageConsumer consumer3 = session.createConsumer(dbTopic); consumer3.setMessageListener(this); hiveConf = new HiveConf(this.getClass()); - hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName()); hiveConf.set("hive.metastore.local", "true"); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); SessionState.start(new CliSessionState(hiveConf)); driver = new Driver(hiveConf); + msc = new HiveMetaStoreClient(hiveConf); } @Override protected void tearDown() throws Exception { - assertEquals(6, cntInvocation.get()); + assertEquals(7, cntInvocation.get()); super.tearDown(); } @@ -98,6 +102,11 @@ driver.run("use mydb"); driver.run("create table mytbl (a string) partitioned by (b string)"); driver.run("alter table mytbl add partition(b='2011')"); + List partSpec = new ArrayList(1); + partSpec.add("b='2011'"); + HiveObjectRef obj = new HiveObjectRef(HiveObjectType.PARTITION,"mydb","mytbl",partSpec,null); + msc.sendMessage(obj, 0); + assertEquals(Boolean.toString(true), msc.recvMessage(obj, 0)); driver.run("alter table mytbl drop partition(b='2011')"); driver.run("drop table mytbl"); driver.run("drop database mydb"); @@ -106,7 +115,6 @@ @Override public void onMessage(Message msg) { cntInvocation.incrementAndGet(); - String event; try { event = msg.getStringProperty(HCatConstants.HCAT_EVENT); @@ -156,11 +164,16 @@ assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString()); assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName()); } + else if(event.equals(HCatConstants.HCAT_FINALIZE_PARTITON)){ + assertEquals("topic://mydb.mytbl",msg.getJMSDestination().toString()); + assertEquals("b='2011'", ((ObjectMessage)msg).getObject()); + } else assert false; } catch (JMSException e) { e.printStackTrace(System.err); assert false; + throw new RuntimeException(e); } } } Index: src/java/org/apache/hcatalog/listener/CleanserThread.java =================================================================== --- src/java/org/apache/hcatalog/listener/CleanserThread.java (revision 0) +++ src/java/org/apache/hcatalog/listener/CleanserThread.java (revision 0) @@ -0,0 +1,81 @@ +package org.apache.hcatalog.listener; + +import javax.jdo.PersistenceManager; +import javax.jdo.Query; + +import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.RawStore; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler.Command; + +/** This is a daemon cleaner thread started at metastore startup time. It loops + * forever, wakes up every 6 hrs and delete all the MSD messages which are + * more then a week old. + */ + +public class CleanserThread extends Thread{ + + private HMSHandler handler; + private long timeBetweenRuns; + private long expiryTime; + + /** + * @param timeBetweenRuns in minutes + * @param expiryTime in minutes + */ + public CleanserThread(long timeBetweenRuns, long expiryTime) { + super(); + // scale them to milliseconds + this.timeBetweenRuns = timeBetweenRuns * 1000; + this.expiryTime = expiryTime * 1000; + } + + + @Override + public void run() { + + while(true){ + + try { + // Sleep for 6 hrs before next run. + Thread.sleep(timeBetweenRuns); + System.err.println("Woke up in cleaner thread"); + if(null == handler) + continue; + final Long curTime = System.currentTimeMillis(); + handler.executeWithRetry(new Command(){ + @Override + public Long run(RawStore ms) throws Exception { + Boolean commited = false; + long delCnt; + try { + ms.openTransaction(); + PersistenceManager pm = ((ObjectStore)ms).getPersistenceManager(); + Query query = pm.newQuery(HPartitionDoneEvent.class, + "curTime - createTime > expiryTime"); + query.declareParameters("java.lang.Long curTime, java.lang.Long expiryTime"); + delCnt = query.deletePersistentAll(curTime, expiryTime); + commited = ms.commitTransaction(); + } + finally { + if (!commited) { + ms.rollbackTransaction(); + } + } + return delCnt; + } + }); + + } catch (Exception e) { + e.printStackTrace(System.err); + } + } + } + + /** + * @param handler the handler to set + */ + public void setHandler(HMSHandler handler) { + this.handler = handler; + } +} Index: src/java/org/apache/hcatalog/listener/NotificationListener.java =================================================================== --- src/java/org/apache/hcatalog/listener/NotificationListener.java (revision 1130631) +++ src/java/org/apache/hcatalog/listener/NotificationListener.java (working copy) @@ -58,6 +58,7 @@ import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.MessageEvent; import org.apache.hcatalog.common.HCatConstants; /** @@ -75,6 +76,7 @@ private static final Log LOG = LogFactory.getLog(NotificationListener.class); private Session session; private Connection conn; + private CleanserThread cleanerTh; /** * Create message bus connection and session in constructor. @@ -91,6 +93,10 @@ // transacted mode. session = conn.createSession(true, Session.SESSION_TRANSACTED); + cleanerTh = new CleanserThread(conf.getLong(HCatConstants.HCAT_MSG_CLEAN_FREQ, 60*6*60), + conf.getLong(HCatConstants.HCAT_MSG_EXPIRY_DURATION, 60*24*7*60)); + cleanerTh.setDaemon(true); + cleanerTh.start(); } catch (NamingException e) { LOG.error("JNDI error while setting up Message Bus connection. " + "Please make sure file named 'jndi.properties' is in " + @@ -175,12 +181,11 @@ // by listening on a topic named "HCAT" and message selector string // as "HCAT_EVENT = HCAT_ADD_TABLE" if(tableEvent.getStatus()){ - if(tableEvent.getStatus()){ Table tbl = tableEvent.getTable(); Table newTbl = tbl.deepCopy(); HMSHandler handler = tableEvent.getHandler(); String namingPolicy = handler.getHiveConf().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAMING_POLICY, "tablename"); - newTbl.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME, getTopicNameForParts(namingPolicy, tbl.getDbName(), tbl.getTableName())); + newTbl.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME, getTopicNameForPartition(namingPolicy, tbl.getDbName(), tbl.getTableName())); try { handler.alter_table(tbl.getDbName(), tbl.getTableName(), newTbl); } catch (InvalidOperationException e) { @@ -188,10 +193,10 @@ } send(tableEvent.getTable(),HCatConstants.HCAT_TOPIC+"."+tbl.getDbName(), HCatConstants.HCAT_ADD_TABLE_EVENT); } + } - } - private String getTopicNameForParts(String namingPolicy, String dbName, String tblName){ + private String getTopicNameForPartition(String namingPolicy, String dbName, String tblName){ // we only have one policy now, so ignore policy param for now. return HCatConstants.HCAT_TOPIC+"."+dbName+"."+tblName; } @@ -263,4 +268,77 @@ LOG.info("Failed to close message bus connection.", ignore); } } + + @Override + public boolean canProcessRecvMessage(int msgType) { + return true; } + + @Override + public boolean canProcessSendMessage(int msgType) { + return true; + } + + @Override + public String processRecvMessage(MessageEvent msg) throws MetaException { + final HiveObjectRef obj = msg.getHiveObj(); + try { + return msg.getHandler().executeWithRetry(new Command() { + @Override + public String run(RawStore ms) throws Exception { + Boolean commited = false; + String ret = Boolean.toString(false); + try { + ms.openTransaction(); + PersistenceManager pm = ((ObjectStore)ms).getPersistenceManager(); + Query query = pm.newQuery(HPartitionDoneEvent.class, + "dbName == t1 && tblName == t2 && partSpec == t3"); + query.declareParameters("java.lang.String t1, java.lang.String t2, java.lang.String t3"); + query.setUnique(true); + HPartitionDoneEvent mpart = (HPartitionDoneEvent) query.execute(obj.getDbName(), obj.getObjectName(), StringUtils.join(obj.getPartValues(),',')); + if(null != mpart ) + ret = Boolean.toString(true); + commited = ms.commitTransaction(); + + } finally { + if (!commited) { + ms.rollbackTransaction(); + } + } + return ret; + } + }); + } catch (Exception e) { + throw new MetaException(e.toString()); + } + } + + @Override + public void processSendMessage(MessageEvent msg) throws MetaException { + final HiveObjectRef obj = msg.getHiveObj(); + final HMSHandler handler = msg.getHandler(); + cleanerTh.setHandler(handler); + // Persist messages in backend objectstore. + try { + if(!handler.executeWithRetry(new Command() { + @Override + public Boolean run(RawStore ms) throws Exception { + PersistenceManager pm = ((ObjectStore)ms).getPersistenceManager(); + ms.openTransaction(); + pm.makePersistent(new HPartitionDoneEvent( + obj.getDbName(),obj.getObjectName(),StringUtils.join(obj.getPartValues(),','))); + return ms.commitTransaction(); + } + })){ + throw new MetaException("Failed to persist MSD message."); + } + } catch (Exception e) { + throw new MetaException(e.toString()); + } + + + // also send message to Message Bus. + send(StringUtils.join(obj.getPartValues(),','), obj.getDbName()+"."+obj.getObjectName(), HCatConstants.HCAT_FINALIZE_PARTITON); + + } +} Index: src/java/org/apache/hcatalog/common/HCatConstants.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatConstants.java (revision 1130631) +++ src/java/org/apache/hcatalog/common/HCatConstants.java (working copy) @@ -65,19 +65,21 @@ public static final String HCAT_KEY_HIVE_CONF = HCAT_KEY_OUTPUT_BASE + ".hive.conf"; public static final String HCAT_KEY_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".token.sig"; - public static final String HCAT_MSG_CLEAN_FREQ = "hcat.msg.clean.freq"; - public static final String HCAT_MSG_EXPIRY_DURATION = "hcat.msg.expiry.duration"; - - public static final String HCAT_MSGBUS_TOPIC_NAME = "hcat.msgbus.topic.name"; - public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = "hcat.msgbus.topic.naming.policy"; - // Message Bus related properties. public static final String HCAT_TOPIC = "HCAT"; public static final String HCAT_EVENT = "HCAT_EVENT"; + public static final String HCAT_FINALIZE_PARTITON = "HCAT_FINALIZE_PARTITON"; + public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION"; public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION"; public static final String HCAT_ADD_TABLE_EVENT = "HCAT_ADD_TABLE"; public static final String HCAT_DROP_TABLE_EVENT = "HCAT_DROP_TABLE"; public static final String HCAT_ADD_DATABASE_EVENT = "HCAT_ADD_DATABASE"; public static final String HCAT_DROP_DATABASE_EVENT = "HCAT_DROP_DATABASE"; + + public static final String HCAT_MSG_CLEAN_FREQ = "hcat.msg.clean.freq"; + public static final String HCAT_MSG_EXPIRY_DURATION = "hcat.msg.expiry.duration"; + + public static final String HCAT_MSGBUS_TOPIC_NAME = "hcat.msgbus.topic.name"; + public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = "hcat.msgbus.topic.naming.policy"; }