Index: conf/jndi.properties
===================================================================
--- conf/jndi.properties (revision 0)
+++ conf/jndi.properties (revision 0)
@@ -0,0 +1,36 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+# If ActiveMQ is used then uncomment following properties, else substitute it accordingly.
+#java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
+
+# use the following property to provide location of MQ broker.
+#java.naming.provider.url = tcp://localhost:61616
+
+# use the following property to specify the JNDI name the connection factory
+# should appear as.
+#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry
+
+# register some queues in JNDI using the form
+# queue.[jndiName] = [physicalName]
+# queue.MyQueue = example.MyQueue
+
+
+# register some topics in JNDI using the form
+# topic.[jndiName] = [physicalName]
+# topic.MyTopic = example.MyTopic
+
Index: ivy.xml
===================================================================
--- ivy.xml (revision 1095571)
+++ ivy.xml (working copy)
@@ -43,5 +43,6 @@
-->
-
+
+
Index: ivy/libraries.properties
===================================================================
--- ivy/libraries.properties (revision 1095571)
+++ ivy/libraries.properties (working copy)
@@ -18,4 +18,4 @@
pig.version=0.8.0
commons-cli.version=1.0
#hadoop-core.version=0.20.2 Waiting for a secure version of hadoop in maven
-
+jms.version=1.1
Index: ivy/ivysettings.xml
===================================================================
--- ivy/ivysettings.xml (revision 1095571)
+++ ivy/ivysettings.xml (working copy)
@@ -36,10 +36,17 @@
+
+
+
Index: src/java/org/apache/hcatalog/listener/HCatListener.java
===================================================================
--- src/java/org/apache/hcatalog/listener/HCatListener.java (revision 0)
+++ src/java/org/apache/hcatalog/listener/HCatListener.java (revision 0)
@@ -0,0 +1,173 @@
+package org.apache.hcatalog.listener;
+
+import java.io.Serializable;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.NoOpListener;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hcatalog.common.HCatConstants;
+
+/**
+ * Implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener}
+ * It sends message on two type of topics. One has name of form dbName.tblName
+ * On this topic, two kind of messages are sent: add/drop partition and
+ * finalize_partition message.
+ * Second topic has name "HCAT" and messages sent on it are: add/drop database
+ * and add/drop table.
+ * All messages also has a property named "HCAT_EVENT" set on them whose value
+ * can be used to configure message selector on subscriber side.
+ */
+public class HCatListener extends NoOpListener{
+
+ private static final Log LOG = LogFactory.getLog(HCatListener.class);
+ private Session session;
+ private Connection conn;
+
+ /**
+ * Create message bus connection and session in constructor.
+ */
+ public HCatListener() {
+
+ try {
+ Context jndiCntxt = new InitialContext();
+ ConnectionFactory connFac = (ConnectionFactory)jndiCntxt.lookup("ConnectionFactory");
+ conn = connFac.createConnection();
+ conn.start();
+ // We want message to be sent when session commits, thus we run in
+ // transacted mode.
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ } catch (NamingException e) {
+ LOG.error("JNDI error while setting up Message Bus connection. " +
+ "Please make sure file named 'jndi.properties' is in " +
+ "classpath and contains appropriate key-value pairs.",e);
+ }
+ catch (JMSException e) {
+ LOG.error("Failed to initialize connection to message bus",e);
+ }
+ }
+
+ @Override
+ public void onAddPartition(Partition partition) throws MetaException {
+ // Subscriber can get notification of newly add partition in a
+ // particular table by listening on a topic named "dbName.tableName"
+ // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION"
+ send(partition, partition.getDbName()+"."+partition.getTableName(),
+ HCatConstants.HCAT_ADD_PARTITION_EVENT);
+ }
+
+ @Override
+ public void onDropPartition(Partition partition) throws MetaException {
+ // Subscriber can get notification of dropped partition in a
+ // particular table by listening on a topic named "dbName.tableName"
+ // and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION"
+ send(partition, partition.getDbName()+"."+partition.getTableName(),
+ HCatConstants.HCAT_DROP_PARTITION_EVENT);
+ }
+
+ @Override
+ public void onCreateDatabase(Database db) throws MetaException {
+ // Subscriber can get notification about addition of a database in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_ADD_DATABASE"
+ send(db,HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_ADD_DATABASE_EVENT);
+ }
+
+ @Override
+ public void onDropDatabase(Database db) throws MetaException {
+ // Subscriber can get notification about drop of a database in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_DROP_DATABASE"
+ send(db,HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_DROP_DATABASE_EVENT);
+ }
+
+ @Override
+ public void onCreateTable(Table table) throws MetaException {
+ // Subscriber can get notification about addition of a table in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_ADD_TABLE"
+ send(table,HCatConstants.HCAT_TOPIC, HCatConstants.HCAT_ADD_TABLE_EVENT);
+ }
+
+ @Override
+ public void onDropTable(Table table) throws MetaException {
+ // Subscriber can get notification about drop of a table in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_DROP_TABLE"
+ send(table,HCatConstants.HCAT_TOPIC, HCatConstants.HCAT_DROP_TABLE_EVENT);
+ }
+
+ @Override
+ public void onFinalizePartition(String dbName, String tblName,
+ String partName) throws MetaException {
+ // Subscriber can get notification about partition finalization by
+ // listening on topic named "dbName.tableName" and message selector string
+ // as "HCAT_EVENT = HCAT_FINALIZE_PARTITION". Message body in this case
+ // will contain partition specification.
+ send(partName,dbName+"."+tblName,HCatConstants.HCAT_FINALIZE_PARTITON);
+ }
+
+ /**
+ * @param msgBody is the metastore object. It is sent in full such that
+ * if subscriber is really interested in details, it can reconstruct it fully.
+ * In case of finalize_partition message this will be string specification of
+ * the partition.
+ * @param topicName is the name on message broker on which message is sent.
+ * @param event is the value of HCAT_EVENT property in message. It can be
+ * used to select messages in client side.
+ */
+ private void send(Serializable msgBody, String topicName, String event){
+
+ if(null == session){
+ // If we weren't able to setup the session in the constructor
+ // we cant send message in any case.
+ LOG.error("Invalid session. Failed to send message on topic: "+
+ topicName + " event: "+event+ " containing message: "+msgBody);
+ return;
+ }
+
+ try{
+ // Topics are created on demand. If it doesn't exist on broker it will
+ // be created when broker receives this message.
+ Destination topic = session.createTopic(topicName);
+ MessageProducer producer = session.createProducer(topic);
+ ObjectMessage msg = session.createObjectMessage(msgBody);
+ msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
+ producer.send(msg);
+ // Message must be transacted before we return.
+ session.commit();
+
+ } catch(Exception e){
+ // Gobble up the exception. Message delivery is best effort.
+ LOG.error("Failed to send message on topic: "+topicName +
+ " event: "+event+ " containing message: "+msgBody, e);
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ // Close the connection before dying.
+ try {
+ if(conn != null) {
+ conn.close();
+ }
+ } catch (Exception ignore) {
+ LOG.info("Failed to close message bus connection.", ignore);
+ }
+ }
+}
\ No newline at end of file
Index: src/java/org/apache/hcatalog/common/HCatConstants.java
===================================================================
--- src/java/org/apache/hcatalog/common/HCatConstants.java (revision 1095571)
+++ src/java/org/apache/hcatalog/common/HCatConstants.java (working copy)
@@ -64,4 +64,16 @@
public static final String HCAT_KEY_OUTPUT_INFO = HCAT_KEY_OUTPUT_BASE + ".info";
public static final String HCAT_KEY_HIVE_CONF = HCAT_KEY_OUTPUT_BASE + ".hive.conf";
public static final String HCAT_KEY_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".token.sig";
+
+ public static final String HCAT_TOPIC = "HCAT";
+ public static final String HCAT_EVENT = "HCAT_EVENT";
+ public static final String HCAT_FINALIZE_PARTITON = "HCAT_FINALIZE_PARTITON";
+
+ public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION";
+ public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION";
+ public static final String HCAT_ADD_TABLE_EVENT = "HCAT_ADD_TABLE";
+ public static final String HCAT_DROP_TABLE_EVENT = "HCAT_DROP_TABLE";
+ public static final String HCAT_ADD_DATABASE_EVENT = "HCAT_ADD_DATABASE";
+ public static final String HCAT_DROP_DATABASE_EVENT = "HCAT_DROP_DATABASE";
+
}