Index: ivy.xml
===================================================================
--- ivy.xml (revision 1097660)
+++ ivy.xml (working copy)
@@ -43,5 +43,8 @@
-->
-
+
+
+
+
Index: ivy/ivysettings.xml
===================================================================
--- ivy/ivysettings.xml (revision 1097660)
+++ ivy/ivysettings.xml (working copy)
@@ -36,10 +36,17 @@
+
+
+
Index: ivy/libraries.properties
===================================================================
--- ivy/libraries.properties (revision 1097660)
+++ ivy/libraries.properties (working copy)
@@ -18,4 +18,6 @@
pig.version=0.8.0
commons-cli.version=1.0
#hadoop-core.version=0.20.2 Waiting for a secure version of hadoop in maven
-
+jms.version=1.1
+activemq.version=5.4.2
+javax-mgmt.version=1.1-rev-1
Index: src/test/org/apache/hcatalog/listener/TestHCatListener.java
===================================================================
--- src/test/org/apache/hcatalog/listener/TestHCatListener.java (revision 0)
+++ src/test/org/apache/hcatalog/listener/TestHCatListener.java (revision 0)
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.listener;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
+import org.apache.hadoop.hive.metastore.api.HiveObjectType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.thrift.TException;
+
+import junit.framework.TestCase;
+
+public class TestHCatListener extends TestCase implements MessageListener{
+
+ private static final String msPort = "20201";
+ private HiveConf hiveConf;
+ private HiveMetaStoreClient msc;
+ private Driver driver;
+ private AtomicInteger cntInvocation = new AtomicInteger(0);
+
+ private static class RunMS implements Runnable {
+
+ @Override
+ public void run() {
+ HiveMetaStore.main(new String[]{msPort});
+ }
+
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+
+ super.setUp();
+ System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+ System.setProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false");
+ System.setProperty("hive.metastore.event.listeners",HCatListener.class.getName());
+ ConnectionFactory connFac = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ Connection conn = connFac.createConnection();
+ conn.start();
+ // We want message to be sent when session commits, thus we run in
+ // transacted mode.
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ Destination hcatTopic = session.createTopic(HCatConstants.HCAT_TOPIC);
+ MessageConsumer consumer1 = session.createConsumer(hcatTopic);
+ consumer1.setMessageListener(this);
+ Destination tblTopic = session.createTopic("mydb.mytbl");
+ MessageConsumer consumer2 = session.createConsumer(tblTopic);
+ consumer2.setMessageListener(this);
+ Thread t = new Thread(new RunMS());
+ t.start();
+ Thread.sleep(20000);
+ hiveConf = new HiveConf(this.getClass());
+ hiveConf.set("hive.metastore.local", "false");
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort);
+ hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3);
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ SessionState.start(new CliSessionState(hiveConf));
+ driver = new Driver(hiveConf);
+ msc = new HiveMetaStoreClient(hiveConf);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ assertEquals(7, cntInvocation.get());
+ super.tearDown();
+ }
+
+ public void testAMQListener() throws MetaException, TException, UnknownTableException, NoSuchObjectException, CommandNeedRetryException{
+ driver.run("create database mydb");
+ driver.run("use mydb");
+ driver.run("create table mytbl (a string) partitioned by (b string)");
+ driver.run("alter table mytbl add partition(b='2011')");
+ List partSpec = new ArrayList(1);
+ partSpec.add("b='2011'");
+ HiveObjectRef obj = new HiveObjectRef(HiveObjectType.PARTITION,"mydb","mytbl",partSpec,null);
+ msc.sendMessage(obj, 0);
+ assertEquals(Boolean.toString(true), msc.recvMessage(obj, 0));
+ driver.run("alter table mytbl drop partition(b='2011')");
+ driver.run("drop table mytbl");
+ driver.run("drop database mydb");
+ }
+
+ @Override
+ public void onMessage(Message msg) {
+ cntInvocation.incrementAndGet();
+ String event;
+ try {
+ event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
+
+ if(event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)){
+
+ assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+ assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName());
+ }
+ else if(event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)){
+
+ assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+ Table tbl = (Table)(((ObjectMessage)msg).getObject());
+ assertEquals("mytbl", tbl.getTableName());
+ assertEquals("mydb", tbl.getDbName());
+ assertEquals(1, tbl.getPartitionKeysSize());
+ }
+ else if(event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)){
+
+ assertEquals("topic://mydb.mytbl",msg.getJMSDestination().toString());
+ Partition part = (Partition)(((ObjectMessage)msg).getObject());
+ assertEquals("mytbl", part.getTableName());
+ assertEquals("mydb", part.getDbName());
+ List vals = new ArrayList(1);
+ vals.add("2011");
+ assertEquals("topic://mydb.mytbl",msg.getJMSDestination().toString());
+ assertEquals(vals,part.getValues());
+ }
+ else if(event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)){
+
+ assertEquals("topic://mydb.mytbl",msg.getJMSDestination().toString());
+ Partition part = (Partition)(((ObjectMessage)msg).getObject());
+ assertEquals("mytbl", part.getTableName());
+ assertEquals("mydb", part.getDbName());
+ List vals = new ArrayList(1);
+ vals.add("2011");
+ assertEquals("topic://mydb.mytbl",msg.getJMSDestination().toString());
+ assertEquals(vals,part.getValues());
+ }
+ else if(event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)){
+
+ assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+ Table tbl = (Table)(((ObjectMessage)msg).getObject());
+ assertEquals("mytbl", tbl.getTableName());
+ assertEquals("mydb", tbl.getDbName());
+ assertEquals(1, tbl.getPartitionKeysSize());
+ }
+ else if(event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)){
+
+ assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+ assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName());
+ }
+ else if(event.equals(HCatConstants.HCAT_FINALIZE_PARTITON)){
+ assertEquals("topic://mydb.mytbl",msg.getJMSDestination().toString());
+ assertEquals("b='2011'", ((ObjectMessage)msg).getObject());
+ }
+ else
+ assert false;
+ } catch (JMSException e) {
+ e.printStackTrace(System.err);
+ assert false;
+ throw new RuntimeException(e);
+ }
+ }
+}
Index: src/java/org/apache/hcatalog/listener/HCatListener.java
===================================================================
--- src/java/org/apache/hcatalog/listener/HCatListener.java (revision 0)
+++ src/java/org/apache/hcatalog/listener/HCatListener.java (revision 0)
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.listener;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.Query;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler.Command;
+import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.MessageEvent;
+import org.apache.hcatalog.common.HCatConstants;
+
+/**
+ * Implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener}
+ * It sends message on two type of topics. One has name of form dbName.tblName
+ * On this topic, two kind of messages are sent: add/drop partition and
+ * finalize_partition message.
+ * Second topic has name "HCAT" and messages sent on it are: add/drop database
+ * and add/drop table.
+ * All messages also has a property named "HCAT_EVENT" set on them whose value
+ * can be used to configure message selector on subscriber side.
+ */
+public class HCatListener extends MetaStoreEventListener{
+
+ private static final Log LOG = LogFactory.getLog(HCatListener.class);
+ private Session session;
+ private Connection conn;
+
+ /**
+ * Create message bus connection and session in constructor.
+ */
+ public HCatListener(final Configuration conf) {
+
+ super(conf);
+ try {
+ Context jndiCntxt = new InitialContext();
+ ConnectionFactory connFac = (ConnectionFactory)jndiCntxt.lookup("ConnectionFactory");
+ conn = connFac.createConnection();
+ conn.start();
+ // We want message to be sent when session commits, thus we run in
+ // transacted mode.
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ } catch (NamingException e) {
+ LOG.error("JNDI error while setting up Message Bus connection. " +
+ "Please make sure file named 'jndi.properties' is in " +
+ "classpath and contains appropriate key-value pairs.",e);
+ }
+ catch (JMSException e) {
+ LOG.error("Failed to initialize connection to message bus",e);
+ }
+ catch(Throwable t){
+ LOG.error("HCAT Listener failed to load",t);
+ }
+ }
+
+ @Override
+ public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException {
+ // Subscriber can get notification of newly add partition in a
+ // particular table by listening on a topic named "dbName.tableName"
+ // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION"
+ if(partitionEvent.getStatus()){
+ Partition partition = partitionEvent.getPartition();
+ send(partition, partition.getDbName()+"."+partition.getTableName(),
+ HCatConstants.HCAT_ADD_PARTITION_EVENT);
+ }
+
+ }
+
+ @Override
+ public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
+ // Subscriber can get notification of dropped partition in a
+ // particular table by listening on a topic named "dbName.tableName"
+ // and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION"
+
+ // Datanucleus throws NPE when we try to serialize a partition object
+ // retrieved from metastore. To workaround that we reset following objects
+
+ if(partitionEvent.getStatus()){
+ Partition partition = partitionEvent.getPartition();
+ StorageDescriptor sd = partition.getSd();
+ sd.setBucketCols(new ArrayList());
+ sd.setSortCols(new ArrayList());
+ sd.setParameters(new HashMap());
+ sd.getSerdeInfo().setParameters(new HashMap());
+ send(partition, partition.getDbName()+"."+partition.getTableName(),
+ HCatConstants.HCAT_DROP_PARTITION_EVENT);
+ }
+ }
+
+ @Override
+ public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
+ // Subscriber can get notification about addition of a database in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_ADD_DATABASE"
+ if(dbEvent.getStatus())
+ send(dbEvent.getDatabase(),HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_ADD_DATABASE_EVENT);
+ }
+
+ @Override
+ public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
+ // Subscriber can get notification about drop of a database in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_DROP_DATABASE"
+ if(dbEvent.getStatus())
+ send(dbEvent.getDatabase(),HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_DROP_DATABASE_EVENT);
+ }
+
+ @Override
+ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
+ // Subscriber can get notification about addition of a table in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_ADD_TABLE"
+ if(tableEvent.getStatus())
+ send(tableEvent.getTable(),HCatConstants.HCAT_TOPIC, HCatConstants.HCAT_ADD_TABLE_EVENT);
+ }
+
+ @Override
+ public void onDropTable(DropTableEvent tableEvent) throws MetaException {
+ // Subscriber can get notification about drop of a table in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_DROP_TABLE"
+
+ // Datanucleus throws NPE when we try to serialize a table object
+ // retrieved from metastore. To workaround that we reset following objects
+
+ if(tableEvent.getStatus()){
+ Table table = tableEvent.getTable();
+ StorageDescriptor sd = table.getSd();
+ sd.setBucketCols(new ArrayList());
+ sd.setSortCols(new ArrayList());
+ sd.setParameters(new HashMap());
+ sd.getSerdeInfo().setParameters(new HashMap());
+ send(table,HCatConstants.HCAT_TOPIC, HCatConstants.HCAT_DROP_TABLE_EVENT);
+ }
+ }
+
+ /**
+ * @param msgBody is the metastore object. It is sent in full such that
+ * if subscriber is really interested in details, it can reconstruct it fully.
+ * In case of finalize_partition message this will be string specification of
+ * the partition.
+ * @param topicName is the name on message broker on which message is sent.
+ * @param event is the value of HCAT_EVENT property in message. It can be
+ * used to select messages in client side.
+ */
+ private void send(Serializable msgBody, String topicName, String event){
+
+ if(null == session){
+ // If we weren't able to setup the session in the constructor
+ // we cant send message in any case.
+ LOG.error("Invalid session. Failed to send message on topic: "+
+ topicName + " event: "+event);
+ return;
+ }
+
+ try{
+ // Topics are created on demand. If it doesn't exist on broker it will
+ // be created when broker receives this message.
+ Destination topic = session.createTopic(topicName);
+ MessageProducer producer = session.createProducer(topic);
+ ObjectMessage msg = session.createObjectMessage(msgBody);
+ msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
+ producer.send(msg);
+ // Message must be transacted before we return.
+ session.commit();
+ } catch(Exception e){
+ // Gobble up the exception. Message delivery is best effort.
+ LOG.error("Failed to send message on topic: "+topicName +
+ " event: "+event , e);
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ // Close the connection before dying.
+ try {
+ if(conn != null) {
+ conn.close();
+ }
+ } catch (Exception ignore) {
+ LOG.info("Failed to close message bus connection.", ignore);
+ }
+ }
+
+ @Override
+ public boolean canProcessRecvMessage(int msgType) {
+ return true;
+ }
+
+ @Override
+ public boolean canProcessSendMessage(int msgType) {
+ return true;
+ }
+
+ @Override
+ public String processRecvMessage(MessageEvent msg) throws MetaException {
+ final HiveObjectRef obj = msg.getHiveObj();
+ String retVal = Boolean.toString(false);
+ try {
+ retVal = msg.getHandler().executeWithRetry(new Command() {
+ @Override
+ public String run(RawStore ms) throws Exception {
+ MPartitionDoneEvent mpart = null;
+ Boolean commited = false;
+ try {
+ ms.openTransaction();
+ String dbName = obj.getDbName().toLowerCase().trim();
+ String tableName = obj.getObjectName().toLowerCase().trim();
+
+ // Change the query to use part_vals instead of the name which is
+ // redundant
+ PersistenceManager pm = ((ObjectStore)ms).getPersistenceManager();
+ Query query = pm.newQuery(MPartitionDoneEvent.class,
+ "dbName == t1 && tblName == t2 && partSpec == t3");
+ query.declareParameters("java.lang.String t1, java.lang.String t2, java.lang.String t3");
+ query.setUnique(true);
+ mpart = (MPartitionDoneEvent) query.execute(tableName, dbName, StringUtils.join(obj.getPartValues(),','));
+ pm.retrieve(mpart);
+ commited = ms.commitTransaction();
+ } finally {
+ if (!commited) {
+ ms.rollbackTransaction();
+ }
+ }
+ return commited.toString();
+ }
+ });
+ return retVal;
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return retVal;
+ }
+ }
+
+ @Override
+ public void processSendMessage(MessageEvent msg) throws MetaException {
+ final HiveObjectRef obj = msg.getHiveObj();
+ try {
+ msg.getHandler().executeWithRetry(new Command() {
+ @Override
+ public Boolean run(RawStore ms) throws Exception {
+ PersistenceManager pm = ((ObjectStore)ms).getPersistenceManager();
+ ms.openTransaction();
+ pm.makePersistent(new MPartitionDoneEvent(
+ obj.getDbName(),obj.getObjectName(),StringUtils.join(obj.getPartValues(),',')));
+ ms.commitTransaction();
+ return Boolean.TRUE;
+ }
+ });
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ send(StringUtils.join(obj.getPartValues(),','), obj.getDbName()+"."+obj.getObjectName(), HCatConstants.HCAT_FINALIZE_PARTITON);
+ }
+}
Index: src/java/org/apache/hcatalog/listener/MPartitionDoneEvent.java
===================================================================
--- src/java/org/apache/hcatalog/listener/MPartitionDoneEvent.java (revision 0)
+++ src/java/org/apache/hcatalog/listener/MPartitionDoneEvent.java (revision 0)
@@ -0,0 +1,47 @@
+package org.apache.hcatalog.listener;
+
+import javax.jdo.annotations.PersistenceCapable;
+import javax.jdo.annotations.Persistent;
+
+@PersistenceCapable
+public class MPartitionDoneEvent {
+
+ @Persistent
+ private String dbName;
+
+ @Persistent
+ private String tblName;
+
+ @Persistent
+ private String partSpec;
+
+ MPartitionDoneEvent(String dbName, String tblName, String partSpec) {
+ super();
+ this.dbName = dbName;
+ this.tblName = tblName;
+ this.partSpec = partSpec;
+ }
+
+ public MPartitionDoneEvent() {}
+
+ /**
+ * @param dbName the dbName to set
+ */
+ public void setDbName(String dbName) {
+ this.dbName = dbName;
+ }
+
+ /**
+ * @param tblName the tblName to set
+ */
+ public void setTblName(String tblName) {
+ this.tblName = tblName;
+ }
+
+ /**
+ * @param partSpec the partSpec to set
+ */
+ public void setPartSpec(String partSpec) {
+ this.partSpec = partSpec;
+ }
+}
Index: src/java/org/apache/hcatalog/common/HCatConstants.java
===================================================================
--- src/java/org/apache/hcatalog/common/HCatConstants.java (revision 1097660)
+++ src/java/org/apache/hcatalog/common/HCatConstants.java (working copy)
@@ -64,4 +64,15 @@
public static final String HCAT_KEY_OUTPUT_INFO = HCAT_KEY_OUTPUT_BASE + ".info";
public static final String HCAT_KEY_HIVE_CONF = HCAT_KEY_OUTPUT_BASE + ".hive.conf";
public static final String HCAT_KEY_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".token.sig";
+
+ public static final String HCAT_TOPIC = "HCAT";
+ public static final String HCAT_EVENT = "HCAT_EVENT";
+ public static final String HCAT_FINALIZE_PARTITON = "HCAT_FINALIZE_PARTITON";
+
+ public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION";
+ public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION";
+ public static final String HCAT_ADD_TABLE_EVENT = "HCAT_ADD_TABLE";
+ public static final String HCAT_DROP_TABLE_EVENT = "HCAT_DROP_TABLE";
+ public static final String HCAT_ADD_DATABASE_EVENT = "HCAT_ADD_DATABASE";
+ public static final String HCAT_DROP_DATABASE_EVENT = "HCAT_DROP_DATABASE";
}
Index: build.xml
===================================================================
--- build.xml (revision 1097660)
+++ build.xml (working copy)
@@ -92,15 +92,20 @@
-
+
+
-
+
+
-
+
+
+
+
@@ -217,6 +222,7 @@
+
@@ -230,6 +236,31 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+