diff --git a/pom.xml b/pom.xml
index 95fd8f2..3b8edeb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,6 +44,7 @@
metastore
odbc
ql
+ repl
serde
service
shims
diff --git a/repl/pom.xml b/repl/pom.xml
new file mode 100644
index 0000000..498ed1f
--- /dev/null
+++ b/repl/pom.xml
@@ -0,0 +1,153 @@
+
+
+
+ 4.0.0
+
+ org.apache.hive
+ hive
+ 0.14.0-SNAPSHOT
+ ../pom.xml
+
+
+ hive-repl
+ jar
+ Hive Replication Support
+
+
+ ..
+
+
+
+
+
+
+ org.apache.hive
+ hive-common
+ ${project.version}
+
+
+ org.apache.hive
+ hive-exec
+ ${project.version}
+
+
+ org.apache.hive
+ hive-metastore
+ ${project.version}
+
+
+ org.apache.hive
+ hive-shims
+ ${project.version}
+
+
+
+ javax.jms
+ jms
+ ${jms.version}
+
+
+
+ org.apache.hive
+ hive-cli
+ ${project.version}
+ test
+
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+ org.apache.activemq
+ activemq-core
+ ${activemq.version}
+ test
+
+
+ org.springframework
+ spring-context
+
+
+
+
+ org.apache.activemq
+ kahadb
+ ${activemq.version}
+ test
+
+
+
+
+
+ hadoop-1
+
+
+ org.apache.hadoop
+ hadoop-core
+ ${hadoop-20S.version}
+ true
+
+
+
+
+ hadoop-2
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop-23.version}
+ true
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ ${hadoop-23.version}
+ test
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ ${hadoop-23.version}
+ test
+
+
+
+
+
+
+ ${basedir}/src/java
+ ${basedir}/src/test
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+
+
+
diff --git a/repl/src/java/org/apache/hive/repl/common/Constants.java b/repl/src/java/org/apache/hive/repl/common/Constants.java
new file mode 100644
index 0000000..3a42610
--- /dev/null
+++ b/repl/src/java/org/apache/hive/repl/common/Constants.java
@@ -0,0 +1,25 @@
+package org.apache.hive.repl.common;
+
+public class Constants {
+
+ // Name markers for various event types
+
+ public static final String REPL_EVENT = "REPL_EVENT";
+
+ public static final String CREATE_DATABASE_EVENT = "CREATE_DATABASE";
+ public static final String DROP_DATABASE_EVENT = "DROP_DATABASE";
+ public static final String CREATE_TABLE_EVENT = "CREATE_TABLE";
+ public static final String DROP_TABLE_EVENT = "DROP_TABLE";
+ public static final String ADD_PARTITION_EVENT = "ADD_PARTITION";
+ public static final String DROP_PARTITION_EVENT = "DROP_PARTITION";
+
+ public static final String MESSAGE_VERSION = "MESSAGE_VERSION";
+ public static final String MESSAGE_FORMAT = "MESSAGE_FORMAT";
+
+ public static final String MSGBUS_TOPIC_NAME = "hive.repl.msgbus.topic.name";
+ public static final String MSGBUS_TOPIC_NAMING_POLICY = "hive.repl.msgbus.topic.naming.policy";
+ public static final String MSGBUS_TOPIC_PREFIX = "hive.repl.msgbus.topic.prefix";
+
+ public static final String DEFAULT_TOPIC_PREFIX = "hrepl";
+
+}
diff --git a/repl/src/java/org/apache/hive/repl/listener/ReplListener.java b/repl/src/java/org/apache/hive/repl/listener/ReplListener.java
new file mode 100644
index 0000000..39ec1a5
--- /dev/null
+++ b/repl/src/java/org/apache/hive/repl/listener/ReplListener.java
@@ -0,0 +1,446 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.repl.listener;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
+import org.apache.hive.repl.common.Constants;
+import org.apache.hive.repl.messaging.EventMessage;
+import org.apache.hive.repl.messaging.MessageFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Implementation of
+ * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends
+ * message on two type of topics. One has name of form dbName.tblName On this
+ * topic, two kind of messages are sent: add/drop partition and
+ * finalize_partition message. Second topic has name and messages sent on
+ * it are: add/drop database and add/drop table. All messages also has a
+ * property named "REPL_EVENT" set on them whose value can be used to configure
+ * message selector on subscriber side.
+ */
+public class ReplListener extends MetaStoreEventListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReplListener.class);
+ protected Connection conn;
+ private static MessageFactory messageFactory = MessageFactory.getInstance();
+ public static final int NUM_RETRIES = 1;
+ private static final String HEALTH_CHECK_TOPIC_SUFFIX = "jms_health_check";
+ private static final String HEALTH_CHECK_MSG = "JMS_HEALTH_CHECK_MESSAGE";
+
+ protected final ThreadLocal session = new ThreadLocal() {
+ @Override
+ protected Session initialValue() {
+ try {
+ return createSession();
+ } catch (Exception e) {
+ LOG.error("Couldn't create JMS Session", e);
+ return null;
+ }
+ }
+
+ @Override
+ public void remove() {
+ if (get() != null) {
+ try {
+ get().close();
+ } catch (Exception e) {
+ LOG.error("Unable to close bad JMS session, ignored error", e);
+ }
+ }
+ super.remove();
+ }
+ };
+
+ /**
+ * Create message bus connection and session in constructor.
+ */
+ public ReplListener(final Configuration conf) {
+ super(conf);
+ testAndCreateConnection();
+ }
+
+ private static String getTopicName(Table table, ListenerEvent partitionEvent) {
+ return table.getParameters().get(Constants.MSGBUS_TOPIC_NAME);
+ }
+
+ @Override
+ public void onAddPartition(AddPartitionEvent partitionEvent)
+ throws MetaException {
+ // Subscriber can get notification of newly add partition in a
+ // particular table by listening on a topic named "dbName.tableName"
+ // and message selector string as "REPL_EVENT = ADD_PARTITION"
+ if (partitionEvent.getStatus()) {
+ Table table = partitionEvent.getTable();
+ List partitions = partitionEvent.getPartitions();
+ String topicName = getTopicName(table, partitionEvent);
+ if (topicName != null && !topicName.equals("")) {
+ send(messageFactory.buildAddPartitionEventMessage(table, partitions), topicName);
+ } else {
+ LOG.info("Topic name not found in metastore. Suppressing notification for "
+ + partitions.get(0).getDbName()
+ + "."
+ + partitions.get(0).getTableName()
+ + " To enable notifications for this table, please do alter table set properties ("
+ + Constants.MSGBUS_TOPIC_NAME
+ + "=.) or whatever you want topic name to be.");
+ }
+ }
+ }
+
+ /**
+ * Send dropped partition notifications. Subscribers can receive these notifications for a
+ * particular table by listening on a topic named "dbName.tableName" with message selector
+ * string {@value org.apache.hive.repl.common.Constants#REPL_EVENT} =
+ * {@value org.apache.hive.repl.common.Constants#DROP_PARTITION_EVENT}.
+ *
+ * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been
+ * found to throw NPE when serializing objects that contain null. For this reason we override
+ * some fields in the StorageDescriptor of this notification. This should be fixed after
+ * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved.
+ */
+ @Override
+ public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
+ if (partitionEvent.getStatus()) {
+ Partition partition = partitionEvent.getPartition();
+ StorageDescriptor sd = partition.getSd();
+ sd.setBucketCols(new ArrayList());
+ sd.setSortCols(new ArrayList());
+ sd.setParameters(new HashMap());
+ sd.getSerdeInfo().setParameters(new HashMap());
+ sd.getSkewedInfo().setSkewedColNames(new ArrayList());
+ String topicName = getTopicName(partitionEvent.getTable(), partitionEvent);
+ if (topicName != null && !topicName.equals("")) {
+ send(messageFactory.buildDropPartitionEventMessage(partitionEvent.getTable(), partition), topicName);
+ } else {
+ LOG.info("Topic name not found in metastore. Suppressing notification for "
+ + partition.getDbName()
+ + "."
+ + partition.getTableName()
+ + " To enable notifications for this table, please do alter table set properties ("
+ + Constants.MSGBUS_TOPIC_NAME
+ + "=.) or whatever you want topic name to be.");
+ }
+ }
+ }
+
+ @Override
+ public void onCreateDatabase(CreateDatabaseEvent dbEvent)
+ throws MetaException {
+ // Subscriber can get notification about addition of a database in
+ // by listening on a topic and message selector string
+ // as "REPL_EVENT = ADD_DATABASE"
+ if (dbEvent.getStatus()) {
+ String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf());
+ send(messageFactory.buildCreateDatabaseEventMessage(dbEvent.getDatabase()), topicName);
+ }
+ }
+
+ @Override
+ public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
+ // Subscriber can get notification about drop of a database in
+ // by listening on a topic named "REPL" and message selector string
+ // as "REPL_EVENT = DROP_DATABASE"
+ if (dbEvent.getStatus()) {
+ String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf());
+ send(messageFactory.buildDropDatabaseEventMessage(dbEvent.getDatabase()), topicName);
+ }
+ }
+
+ @Override
+ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
+ // Subscriber can get notification about addition of a table
+ // by listening on a topic named "REPL" and message selector string
+ // as "REPL_EVENT = ADD_TABLE"
+ if (tableEvent.getStatus()) {
+ Table tbl = tableEvent.getTable();
+ HMSHandler handler = tableEvent.getHandler();
+ HiveConf conf = handler.getHiveConf();
+ Table newTbl;
+ try {
+ newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName())
+ .deepCopy();
+ newTbl.getParameters().put(
+ Constants.MSGBUS_TOPIC_NAME,
+ getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "."
+ + newTbl.getTableName().toLowerCase());
+ handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl);
+ } catch (InvalidOperationException e) {
+ MetaException me = new MetaException(e.toString());
+ me.initCause(e);
+ throw me;
+ } catch (NoSuchObjectException e) {
+ MetaException me = new MetaException(e.toString());
+ me.initCause(e);
+ throw me;
+ }
+ String topicName = getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase();
+ send(messageFactory.buildCreateTableEventMessage(newTbl), topicName);
+ }
+ }
+
+ private String getTopicPrefix(Configuration conf) {
+ return conf.get(Constants.MSGBUS_TOPIC_PREFIX,
+ Constants.DEFAULT_TOPIC_PREFIX);
+ }
+
+ /**
+ * Send dropped table notifications. Subscribers can receive these notifications for
+ * dropped tables by listening on topic "REPL" with message selector string
+ * {@value org.apache.hive.repl.common.Constants#REPL_EVENT} =
+ * {@value org.apache.hive.repl.common.Constants#DROP_TABLE_EVENT}
+ *
+ * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been
+ * found to throw NPE when serializing objects that contain null. For this reason we override
+ * some fields in the StorageDescriptor of this notification. This should be fixed after
+ * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved.
+ */
+ @Override
+ public void onDropTable(DropTableEvent tableEvent) throws MetaException {
+ // Subscriber can get notification about drop of a table
+ // by listening on a topic named "REPL" and message selector string
+ // as "REPL_EVENT = DROP_TABLE"
+
+ // Datanucleus throws NPE when we try to serialize a table object
+ // retrieved from metastore. To workaround that we reset following objects
+
+ if (tableEvent.getStatus()) {
+ Table table = tableEvent.getTable();
+ String topicName = getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + table.getDbName().toLowerCase();
+ send(messageFactory.buildDropTableEventMessage(table), topicName);
+ }
+ }
+
+ /**
+ * @param eventMessage The EventMessage being sent over JMS.
+ * @param topicName is the name on message broker on which message is sent.
+ */
+ protected void send(EventMessage eventMessage, String topicName) {
+ send(eventMessage, topicName, NUM_RETRIES);
+ }
+
+ /**
+ * @param eventMessage The EventMessage being sent over JMS, this method is threadsafe
+ * @param topicName is the name on message broker on which message is sent.
+ * @param retries the number of retry attempts
+ */
+ protected void send(EventMessage eventMessage, String topicName, int retries) {
+ try {
+ if (session.get() == null) {
+ // Need to reconnect
+ throw new JMSException("Invalid JMS session");
+ }
+ Destination topic = createTopic(topicName);
+ Message msg = session.get().createTextMessage(eventMessage.toString());
+
+ msg.setStringProperty(Constants.REPL_EVENT, eventMessage.getEventType().toString());
+ msg.setStringProperty(Constants.MESSAGE_VERSION, messageFactory.getVersion());
+ msg.setStringProperty(Constants.MESSAGE_FORMAT, messageFactory.getMessageFormat());
+ MessageProducer producer = createProducer(topic);
+ producer.send(msg);
+ // Message must be transacted before we return.
+ session.get().commit();
+ } catch (Exception e) {
+ if (retries >= 0) {
+ // this may happen if we were able to establish connection once, but its no longer valid
+ LOG.error("Seems like connection is lost. Will retry. Retries left : " + retries + ". error was:", e);
+ testAndCreateConnection();
+ send(eventMessage, topicName, retries - 1);
+ } else {
+ // Gobble up the exception. Message delivery is best effort.
+ LOG.error("Failed to send message on topic: " + topicName +
+ " event: " + eventMessage.getEventType() + " after retries: " + NUM_RETRIES, e);
+ }
+ }
+ }
+
+ /**
+ * Get the topic object for the topicName
+ *
+ * @param topicName The String identifying the message-topic.
+ * @return A {@link javax.jms.Topic} object corresponding to the specified topicName.
+ * @throws javax.jms.JMSException
+ */
+ protected Topic createTopic(final String topicName) throws JMSException {
+ return session.get().createTopic(topicName);
+ }
+
+ /**
+ * Does a health check on the connection by sending a dummy message.
+ * Create the connection if the connection is found to be bad
+ * Also recreates the session
+ */
+ protected synchronized void testAndCreateConnection() {
+ if (conn != null) {
+ // This method is reached when error occurs while sending msg, so the session must be bad
+ session.remove();
+ if (!isConnectionHealthy()) {
+ // I am the first thread to detect the error, cleanup old connection & reconnect
+ try {
+ conn.close();
+ } catch (Exception e) {
+ LOG.error("Unable to close bad JMS connection, ignored error", e);
+ }
+ conn = createConnection();
+ }
+ } else {
+ conn = createConnection();
+ }
+ try {
+ session.set(createSession());
+ } catch (JMSException e) {
+ LOG.error("Couldn't create JMS session, ignored the error", e);
+ }
+ }
+
+ /**
+ * Create the JMS connection
+ * @return newly created JMS connection
+ */
+ protected Connection createConnection() {
+ LOG.info("Will create new JMS connection");
+ Context jndiCntxt;
+ Connection jmsConnection = null;
+ try {
+ jndiCntxt = new InitialContext();
+ ConnectionFactory connFac = (ConnectionFactory) jndiCntxt.lookup("ConnectionFactory");
+ jmsConnection = connFac.createConnection();
+ jmsConnection.start();
+ jmsConnection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException jmse) {
+ LOG.error("JMS Exception listener received exception. Ignored the error", jmse);
+ }
+ });
+ } catch (NamingException e) {
+ LOG.error("JNDI error while setting up Message Bus connection. "
+ + "Please make sure file named 'jndi.properties' is in "
+ + "classpath and contains appropriate key-value pairs.", e);
+ } catch (JMSException e) {
+ LOG.error("Failed to initialize connection to message bus", e);
+ } catch (Throwable t) {
+ LOG.error("Unable to connect to JMS provider", t);
+ }
+ return jmsConnection;
+ }
+
+ /**
+ * Send a dummy message to probe if the JMS connection is healthy
+ * @return true if connection is healthy, false otherwise
+ */
+ protected boolean isConnectionHealthy() {
+ try {
+ Topic topic = createTopic(getTopicPrefix(getConf()) + "." + HEALTH_CHECK_TOPIC_SUFFIX);
+ MessageProducer producer = createProducer(topic);
+ Message msg = session.get().createTextMessage(HEALTH_CHECK_MSG);
+ producer.send(msg, DeliveryMode.NON_PERSISTENT, 4, 0);
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Creates a JMS session
+ * @return newly create JMS session
+ * @throws javax.jms.JMSException
+ */
+ protected Session createSession() throws JMSException {
+ // We want message to be sent when session commits, thus we run in
+ // transacted mode.
+ return conn.createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ /**
+ * Create a JMS producer
+ * @param topic
+ * @return newly created message producer
+ * @throws javax.jms.JMSException
+ */
+ protected MessageProducer createProducer(Destination topic) throws JMSException {
+ return session.get().createProducer(topic);
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (Exception e) {
+ LOG.error("Couldn't close jms connection, ignored the error", e);
+ }
+ }
+ }
+
+ @Override
+ public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
+ throws MetaException {
+// TODO: Fix LoadPartitionDoneEvent. Currently, LPDE can only carry a single partition-spec. And that defeats the purpose.
+// if(lpde.getStatus())
+// send(lpde.getPartitionName(),lpde.getTable().getParameters().get(Constants.MSGBUS_TOPIC_NAME),Constants.PARTITION_DONE_EVENT);
+ }
+
+ @Override
+ public void onAlterPartition(AlterPartitionEvent ape) throws MetaException {
+ // no-op
+ }
+
+ @Override
+ public void onAlterTable(AlterTableEvent ate) throws MetaException {
+ // no-op
+ }
+}
diff --git a/repl/src/java/org/apache/hive/repl/messaging/AddPartitionMessage.java b/repl/src/java/org/apache/hive/repl/messaging/AddPartitionMessage.java
new file mode 100644
index 0000000..c18b39a
--- /dev/null
+++ b/repl/src/java/org/apache/hive/repl/messaging/AddPartitionMessage.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.repl.messaging;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The message sent when partition(s) are added to a table.
+ */
+public abstract class AddPartitionMessage extends EventMessage {
+
+ protected AddPartitionMessage() {
+ super(EventType.ADD_PARTITION);
+ }
+
+ /**
+ * Getter for name of table (where partitions are added).
+ * @return Table-name (String).
+ */
+ public abstract String getTable();
+
+ /**
+ * Getter for list of partitions added.
+ * @return List of maps, where each map identifies values for each partition-key, for every added partition.
+ */
+ public abstract List