Index: conf/jndi.properties =================================================================== --- conf/jndi.properties (revision 0) +++ conf/jndi.properties (revision 0) @@ -0,0 +1,36 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# If ActiveMQ is used then uncomment following properties, else substitute it accordingly. +#java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory + +# use the following property to provide location of MQ broker. +#java.naming.provider.url = tcp://localhost:61616 + +# use the following property to specify the JNDI name the connection factory +# should appear as. +#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry + +# register some queues in JNDI using the form +# queue.[jndiName] = [physicalName] +# queue.MyQueue = example.MyQueue + + +# register some topics in JNDI using the form +# topic.[jndiName] = [physicalName] +# topic.MyTopic = example.MyTopic + Index: ivy.xml =================================================================== --- ivy.xml (revision 1095571) +++ ivy.xml (working copy) @@ -43,5 +43,6 @@ --> - + + Index: ivy/libraries.properties =================================================================== --- ivy/libraries.properties (revision 1095571) +++ ivy/libraries.properties (working copy) @@ -18,4 +18,4 @@ pig.version=0.8.0 commons-cli.version=1.0 #hadoop-core.version=0.20.2 Waiting for a secure version of hadoop in maven - +jms.version=1.1 Index: ivy/ivysettings.xml =================================================================== --- ivy/ivysettings.xml (revision 1095571) +++ ivy/ivysettings.xml (working copy) @@ -36,10 +36,17 @@ + + + Index: src/java/org/apache/hcatalog/listener/HCatListener.java =================================================================== --- src/java/org/apache/hcatalog/listener/HCatListener.java (revision 0) +++ src/java/org/apache/hcatalog/listener/HCatListener.java (revision 0) @@ -0,0 +1,173 @@ +package org.apache.hcatalog.listener; + +import java.io.Serializable; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.NoOpListener; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hcatalog.common.HCatConstants; + +/** + * Implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} + * It sends message on two type of topics. One has name of form dbName.tblName + * On this topic, two kind of messages are sent: add/drop partition and + * finalize_partition message. + * Second topic has name "HCAT" and messages sent on it are: add/drop database + * and add/drop table. + * All messages also has a property named "HCAT_EVENT" set on them whose value + * can be used to configure message selector on subscriber side. + */ +public class HCatListener extends NoOpListener{ + + private static final Log LOG = LogFactory.getLog(HCatListener.class); + private Session session; + private Connection conn; + + /** + * Create message bus connection and session in constructor. + */ + public HCatListener() { + + try { + Context jndiCntxt = new InitialContext(); + ConnectionFactory connFac = (ConnectionFactory)jndiCntxt.lookup("ConnectionFactory"); + conn = connFac.createConnection(); + conn.start(); + // We want message to be sent when session commits, thus we run in + // transacted mode. + session = conn.createSession(true, Session.SESSION_TRANSACTED); + + } catch (NamingException e) { + LOG.error("JNDI error while setting up Message Bus connection. " + + "Please make sure file named 'jndi.properties' is in " + + "classpath and contains appropriate key-value pairs.",e); + } + catch (JMSException e) { + LOG.error("Failed to initialize connection to message bus",e); + } + } + + @Override + public void onAddPartition(Partition partition) throws MetaException { + // Subscriber can get notification of newly add partition in a + // particular table by listening on a topic named "dbName.tableName" + // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" + send(partition, partition.getDbName()+"."+partition.getTableName(), + HCatConstants.HCAT_ADD_PARTITION_EVENT); + } + + @Override + public void onDropPartition(Partition partition) throws MetaException { + // Subscriber can get notification of dropped partition in a + // particular table by listening on a topic named "dbName.tableName" + // and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION" + send(partition, partition.getDbName()+"."+partition.getTableName(), + HCatConstants.HCAT_DROP_PARTITION_EVENT); + } + + @Override + public void onCreateDatabase(Database db) throws MetaException { + // Subscriber can get notification about addition of a database in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_ADD_DATABASE" + send(db,HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_ADD_DATABASE_EVENT); + } + + @Override + public void onDropDatabase(Database db) throws MetaException { + // Subscriber can get notification about drop of a database in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_DROP_DATABASE" + send(db,HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_DROP_DATABASE_EVENT); + } + + @Override + public void onCreateTable(Table table) throws MetaException { + // Subscriber can get notification about addition of a table in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_ADD_TABLE" + send(table,HCatConstants.HCAT_TOPIC, HCatConstants.HCAT_ADD_TABLE_EVENT); + } + + @Override + public void onDropTable(Table table) throws MetaException { + // Subscriber can get notification about drop of a table in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_DROP_TABLE" + send(table,HCatConstants.HCAT_TOPIC, HCatConstants.HCAT_DROP_TABLE_EVENT); + } + + @Override + public void onFinalizePartition(String dbName, String tblName, + String partName) throws MetaException { + // Subscriber can get notification about partition finalization by + // listening on topic named "dbName.tableName" and message selector string + // as "HCAT_EVENT = HCAT_FINALIZE_PARTITION". Message body in this case + // will contain partition specification. + send(partName,dbName+"."+tblName,HCatConstants.HCAT_FINALIZE_PARTITON); + } + + /** + * @param msgBody is the metastore object. It is sent in full such that + * if subscriber is really interested in details, it can reconstruct it fully. + * In case of finalize_partition message this will be string specification of + * the partition. + * @param topicName is the name on message broker on which message is sent. + * @param event is the value of HCAT_EVENT property in message. It can be + * used to select messages in client side. + */ + private void send(Serializable msgBody, String topicName, String event){ + + if(null == session){ + // If we weren't able to setup the session in the constructor + // we cant send message in any case. + LOG.error("Invalid session. Failed to send message on topic: "+ + topicName + " event: "+event+ " containing message: "+msgBody); + return; + } + + try{ + // Topics are created on demand. If it doesn't exist on broker it will + // be created when broker receives this message. + Destination topic = session.createTopic(topicName); + MessageProducer producer = session.createProducer(topic); + ObjectMessage msg = session.createObjectMessage(msgBody); + msg.setStringProperty(HCatConstants.HCAT_EVENT, event); + producer.send(msg); + // Message must be transacted before we return. + session.commit(); + + } catch(Exception e){ + // Gobble up the exception. Message delivery is best effort. + LOG.error("Failed to send message on topic: "+topicName + + " event: "+event+ " containing message: "+msgBody, e); + } + } + + @Override + protected void finalize() throws Throwable { + // Close the connection before dying. + try { + if(conn != null) { + conn.close(); + } + } catch (Exception ignore) { + LOG.info("Failed to close message bus connection.", ignore); + } + } +} \ No newline at end of file Index: src/java/org/apache/hcatalog/common/HCatConstants.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatConstants.java (revision 1095571) +++ src/java/org/apache/hcatalog/common/HCatConstants.java (working copy) @@ -64,4 +64,16 @@ public static final String HCAT_KEY_OUTPUT_INFO = HCAT_KEY_OUTPUT_BASE + ".info"; public static final String HCAT_KEY_HIVE_CONF = HCAT_KEY_OUTPUT_BASE + ".hive.conf"; public static final String HCAT_KEY_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".token.sig"; + + public static final String HCAT_TOPIC = "HCAT"; + public static final String HCAT_EVENT = "HCAT_EVENT"; + public static final String HCAT_FINALIZE_PARTITON = "HCAT_FINALIZE_PARTITON"; + + public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION"; + public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION"; + public static final String HCAT_ADD_TABLE_EVENT = "HCAT_ADD_TABLE"; + public static final String HCAT_DROP_TABLE_EVENT = "HCAT_DROP_TABLE"; + public static final String HCAT_ADD_DATABASE_EVENT = "HCAT_ADD_DATABASE"; + public static final String HCAT_DROP_DATABASE_EVENT = "HCAT_DROP_DATABASE"; + }