diff --git a/pom.xml b/pom.xml index 95fd8f2..3b8edeb 100644 --- a/pom.xml +++ b/pom.xml @@ -44,6 +44,7 @@ metastore odbc ql + repl serde service shims diff --git a/repl/pom.xml b/repl/pom.xml new file mode 100644 index 0000000..498ed1f --- /dev/null +++ b/repl/pom.xml @@ -0,0 +1,153 @@ + + + + 4.0.0 + + org.apache.hive + hive + 0.14.0-SNAPSHOT + ../pom.xml + + + hive-repl + jar + Hive Replication Support + + + .. + + + + + + + org.apache.hive + hive-common + ${project.version} + + + org.apache.hive + hive-exec + ${project.version} + + + org.apache.hive + hive-metastore + ${project.version} + + + org.apache.hive + hive-shims + ${project.version} + + + + javax.jms + jms + ${jms.version} + + + + org.apache.hive + hive-cli + ${project.version} + test + + + + junit + junit + ${junit.version} + test + + + org.apache.activemq + activemq-core + ${activemq.version} + test + + + org.springframework + spring-context + + + + + org.apache.activemq + kahadb + ${activemq.version} + test + + + + + + hadoop-1 + + + org.apache.hadoop + hadoop-core + ${hadoop-20S.version} + true + + + + + hadoop-2 + + + org.apache.hadoop + hadoop-common + ${hadoop-23.version} + true + + + + org.apache.hadoop + hadoop-hdfs + ${hadoop-23.version} + test + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop-23.version} + test + + + + + + + ${basedir}/src/java + ${basedir}/src/test + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + + diff --git a/repl/src/java/org/apache/hive/repl/common/Constants.java b/repl/src/java/org/apache/hive/repl/common/Constants.java new file mode 100644 index 0000000..3a42610 --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/common/Constants.java @@ -0,0 +1,25 @@ +package org.apache.hive.repl.common; + +public class Constants { + + // Name markers for various event types + + public static final String REPL_EVENT = "REPL_EVENT"; + + public static final String CREATE_DATABASE_EVENT = "CREATE_DATABASE"; + public static final String DROP_DATABASE_EVENT = "DROP_DATABASE"; + public static final String CREATE_TABLE_EVENT = "CREATE_TABLE"; + public static final String DROP_TABLE_EVENT = "DROP_TABLE"; + public static final String ADD_PARTITION_EVENT = "ADD_PARTITION"; + public static final String DROP_PARTITION_EVENT = "DROP_PARTITION"; + + public static final String MESSAGE_VERSION = "MESSAGE_VERSION"; + public static final String MESSAGE_FORMAT = "MESSAGE_FORMAT"; + + public static final String MSGBUS_TOPIC_NAME = "hive.repl.msgbus.topic.name"; + public static final String MSGBUS_TOPIC_NAMING_POLICY = "hive.repl.msgbus.topic.naming.policy"; + public static final String MSGBUS_TOPIC_PREFIX = "hive.repl.msgbus.topic.prefix"; + + public static final String DEFAULT_TOPIC_PREFIX = "hrepl"; + +} diff --git a/repl/src/java/org/apache/hive/repl/listener/ReplListener.java b/repl/src/java/org/apache/hive/repl/listener/ReplListener.java new file mode 100644 index 0000000..39ec1a5 --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/listener/ReplListener.java @@ -0,0 +1,446 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.listener; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.apache.hadoop.hive.metastore.MetaStoreEventListener; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; +import org.apache.hive.repl.common.Constants; +import org.apache.hive.repl.messaging.EventMessage; +import org.apache.hive.repl.messaging.MessageFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * Implementation of + * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends + * message on two type of topics. One has name of form dbName.tblName On this + * topic, two kind of messages are sent: add/drop partition and + * finalize_partition message. Second topic has name and messages sent on + * it are: add/drop database and add/drop table. All messages also has a + * property named "REPL_EVENT" set on them whose value can be used to configure + * message selector on subscriber side. + */ +public class ReplListener extends MetaStoreEventListener { + + private static final Logger LOG = LoggerFactory.getLogger(ReplListener.class); + protected Connection conn; + private static MessageFactory messageFactory = MessageFactory.getInstance(); + public static final int NUM_RETRIES = 1; + private static final String HEALTH_CHECK_TOPIC_SUFFIX = "jms_health_check"; + private static final String HEALTH_CHECK_MSG = "JMS_HEALTH_CHECK_MESSAGE"; + + protected final ThreadLocal session = new ThreadLocal() { + @Override + protected Session initialValue() { + try { + return createSession(); + } catch (Exception e) { + LOG.error("Couldn't create JMS Session", e); + return null; + } + } + + @Override + public void remove() { + if (get() != null) { + try { + get().close(); + } catch (Exception e) { + LOG.error("Unable to close bad JMS session, ignored error", e); + } + } + super.remove(); + } + }; + + /** + * Create message bus connection and session in constructor. + */ + public ReplListener(final Configuration conf) { + super(conf); + testAndCreateConnection(); + } + + private static String getTopicName(Table table, ListenerEvent partitionEvent) { + return table.getParameters().get(Constants.MSGBUS_TOPIC_NAME); + } + + @Override + public void onAddPartition(AddPartitionEvent partitionEvent) + throws MetaException { + // Subscriber can get notification of newly add partition in a + // particular table by listening on a topic named "dbName.tableName" + // and message selector string as "REPL_EVENT = ADD_PARTITION" + if (partitionEvent.getStatus()) { + Table table = partitionEvent.getTable(); + List partitions = partitionEvent.getPartitions(); + String topicName = getTopicName(table, partitionEvent); + if (topicName != null && !topicName.equals("")) { + send(messageFactory.buildAddPartitionEventMessage(table, partitions), topicName); + } else { + LOG.info("Topic name not found in metastore. Suppressing notification for " + + partitions.get(0).getDbName() + + "." + + partitions.get(0).getTableName() + + " To enable notifications for this table, please do alter table set properties (" + + Constants.MSGBUS_TOPIC_NAME + + "=.) or whatever you want topic name to be."); + } + } + } + + /** + * Send dropped partition notifications. Subscribers can receive these notifications for a + * particular table by listening on a topic named "dbName.tableName" with message selector + * string {@value org.apache.hive.repl.common.Constants#REPL_EVENT} = + * {@value org.apache.hive.repl.common.Constants#DROP_PARTITION_EVENT}. + *
+ * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been + * found to throw NPE when serializing objects that contain null. For this reason we override + * some fields in the StorageDescriptor of this notification. This should be fixed after + * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved. + */ + @Override + public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { + if (partitionEvent.getStatus()) { + Partition partition = partitionEvent.getPartition(); + StorageDescriptor sd = partition.getSd(); + sd.setBucketCols(new ArrayList()); + sd.setSortCols(new ArrayList()); + sd.setParameters(new HashMap()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSkewedInfo().setSkewedColNames(new ArrayList()); + String topicName = getTopicName(partitionEvent.getTable(), partitionEvent); + if (topicName != null && !topicName.equals("")) { + send(messageFactory.buildDropPartitionEventMessage(partitionEvent.getTable(), partition), topicName); + } else { + LOG.info("Topic name not found in metastore. Suppressing notification for " + + partition.getDbName() + + "." + + partition.getTableName() + + " To enable notifications for this table, please do alter table set properties (" + + Constants.MSGBUS_TOPIC_NAME + + "=.) or whatever you want topic name to be."); + } + } + } + + @Override + public void onCreateDatabase(CreateDatabaseEvent dbEvent) + throws MetaException { + // Subscriber can get notification about addition of a database in + // by listening on a topic and message selector string + // as "REPL_EVENT = ADD_DATABASE" + if (dbEvent.getStatus()) { + String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf()); + send(messageFactory.buildCreateDatabaseEventMessage(dbEvent.getDatabase()), topicName); + } + } + + @Override + public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { + // Subscriber can get notification about drop of a database in + // by listening on a topic named "REPL" and message selector string + // as "REPL_EVENT = DROP_DATABASE" + if (dbEvent.getStatus()) { + String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf()); + send(messageFactory.buildDropDatabaseEventMessage(dbEvent.getDatabase()), topicName); + } + } + + @Override + public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { + // Subscriber can get notification about addition of a table + // by listening on a topic named "REPL" and message selector string + // as "REPL_EVENT = ADD_TABLE" + if (tableEvent.getStatus()) { + Table tbl = tableEvent.getTable(); + HMSHandler handler = tableEvent.getHandler(); + HiveConf conf = handler.getHiveConf(); + Table newTbl; + try { + newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName()) + .deepCopy(); + newTbl.getParameters().put( + Constants.MSGBUS_TOPIC_NAME, + getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "." + + newTbl.getTableName().toLowerCase()); + handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl); + } catch (InvalidOperationException e) { + MetaException me = new MetaException(e.toString()); + me.initCause(e); + throw me; + } catch (NoSuchObjectException e) { + MetaException me = new MetaException(e.toString()); + me.initCause(e); + throw me; + } + String topicName = getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase(); + send(messageFactory.buildCreateTableEventMessage(newTbl), topicName); + } + } + + private String getTopicPrefix(Configuration conf) { + return conf.get(Constants.MSGBUS_TOPIC_PREFIX, + Constants.DEFAULT_TOPIC_PREFIX); + } + + /** + * Send dropped table notifications. Subscribers can receive these notifications for + * dropped tables by listening on topic "REPL" with message selector string + * {@value org.apache.hive.repl.common.Constants#REPL_EVENT} = + * {@value org.apache.hive.repl.common.Constants#DROP_TABLE_EVENT} + *
+ * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been + * found to throw NPE when serializing objects that contain null. For this reason we override + * some fields in the StorageDescriptor of this notification. This should be fixed after + * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved. + */ + @Override + public void onDropTable(DropTableEvent tableEvent) throws MetaException { + // Subscriber can get notification about drop of a table + // by listening on a topic named "REPL" and message selector string + // as "REPL_EVENT = DROP_TABLE" + + // Datanucleus throws NPE when we try to serialize a table object + // retrieved from metastore. To workaround that we reset following objects + + if (tableEvent.getStatus()) { + Table table = tableEvent.getTable(); + String topicName = getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + table.getDbName().toLowerCase(); + send(messageFactory.buildDropTableEventMessage(table), topicName); + } + } + + /** + * @param eventMessage The EventMessage being sent over JMS. + * @param topicName is the name on message broker on which message is sent. + */ + protected void send(EventMessage eventMessage, String topicName) { + send(eventMessage, topicName, NUM_RETRIES); + } + + /** + * @param eventMessage The EventMessage being sent over JMS, this method is threadsafe + * @param topicName is the name on message broker on which message is sent. + * @param retries the number of retry attempts + */ + protected void send(EventMessage eventMessage, String topicName, int retries) { + try { + if (session.get() == null) { + // Need to reconnect + throw new JMSException("Invalid JMS session"); + } + Destination topic = createTopic(topicName); + Message msg = session.get().createTextMessage(eventMessage.toString()); + + msg.setStringProperty(Constants.REPL_EVENT, eventMessage.getEventType().toString()); + msg.setStringProperty(Constants.MESSAGE_VERSION, messageFactory.getVersion()); + msg.setStringProperty(Constants.MESSAGE_FORMAT, messageFactory.getMessageFormat()); + MessageProducer producer = createProducer(topic); + producer.send(msg); + // Message must be transacted before we return. + session.get().commit(); + } catch (Exception e) { + if (retries >= 0) { + // this may happen if we were able to establish connection once, but its no longer valid + LOG.error("Seems like connection is lost. Will retry. Retries left : " + retries + ". error was:", e); + testAndCreateConnection(); + send(eventMessage, topicName, retries - 1); + } else { + // Gobble up the exception. Message delivery is best effort. + LOG.error("Failed to send message on topic: " + topicName + + " event: " + eventMessage.getEventType() + " after retries: " + NUM_RETRIES, e); + } + } + } + + /** + * Get the topic object for the topicName + * + * @param topicName The String identifying the message-topic. + * @return A {@link javax.jms.Topic} object corresponding to the specified topicName. + * @throws javax.jms.JMSException + */ + protected Topic createTopic(final String topicName) throws JMSException { + return session.get().createTopic(topicName); + } + + /** + * Does a health check on the connection by sending a dummy message. + * Create the connection if the connection is found to be bad + * Also recreates the session + */ + protected synchronized void testAndCreateConnection() { + if (conn != null) { + // This method is reached when error occurs while sending msg, so the session must be bad + session.remove(); + if (!isConnectionHealthy()) { + // I am the first thread to detect the error, cleanup old connection & reconnect + try { + conn.close(); + } catch (Exception e) { + LOG.error("Unable to close bad JMS connection, ignored error", e); + } + conn = createConnection(); + } + } else { + conn = createConnection(); + } + try { + session.set(createSession()); + } catch (JMSException e) { + LOG.error("Couldn't create JMS session, ignored the error", e); + } + } + + /** + * Create the JMS connection + * @return newly created JMS connection + */ + protected Connection createConnection() { + LOG.info("Will create new JMS connection"); + Context jndiCntxt; + Connection jmsConnection = null; + try { + jndiCntxt = new InitialContext(); + ConnectionFactory connFac = (ConnectionFactory) jndiCntxt.lookup("ConnectionFactory"); + jmsConnection = connFac.createConnection(); + jmsConnection.start(); + jmsConnection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException jmse) { + LOG.error("JMS Exception listener received exception. Ignored the error", jmse); + } + }); + } catch (NamingException e) { + LOG.error("JNDI error while setting up Message Bus connection. " + + "Please make sure file named 'jndi.properties' is in " + + "classpath and contains appropriate key-value pairs.", e); + } catch (JMSException e) { + LOG.error("Failed to initialize connection to message bus", e); + } catch (Throwable t) { + LOG.error("Unable to connect to JMS provider", t); + } + return jmsConnection; + } + + /** + * Send a dummy message to probe if the JMS connection is healthy + * @return true if connection is healthy, false otherwise + */ + protected boolean isConnectionHealthy() { + try { + Topic topic = createTopic(getTopicPrefix(getConf()) + "." + HEALTH_CHECK_TOPIC_SUFFIX); + MessageProducer producer = createProducer(topic); + Message msg = session.get().createTextMessage(HEALTH_CHECK_MSG); + producer.send(msg, DeliveryMode.NON_PERSISTENT, 4, 0); + } catch (Exception e) { + return false; + } + return true; + } + + /** + * Creates a JMS session + * @return newly create JMS session + * @throws javax.jms.JMSException + */ + protected Session createSession() throws JMSException { + // We want message to be sent when session commits, thus we run in + // transacted mode. + return conn.createSession(true, Session.SESSION_TRANSACTED); + } + + /** + * Create a JMS producer + * @param topic + * @return newly created message producer + * @throws javax.jms.JMSException + */ + protected MessageProducer createProducer(Destination topic) throws JMSException { + return session.get().createProducer(topic); + } + + @Override + protected void finalize() throws Throwable { + if (conn != null) { + try { + conn.close(); + } catch (Exception e) { + LOG.error("Couldn't close jms connection, ignored the error", e); + } + } + } + + @Override + public void onLoadPartitionDone(LoadPartitionDoneEvent lpde) + throws MetaException { +// TODO: Fix LoadPartitionDoneEvent. Currently, LPDE can only carry a single partition-spec. And that defeats the purpose. +// if(lpde.getStatus()) +// send(lpde.getPartitionName(),lpde.getTable().getParameters().get(Constants.MSGBUS_TOPIC_NAME),Constants.PARTITION_DONE_EVENT); + } + + @Override + public void onAlterPartition(AlterPartitionEvent ape) throws MetaException { + // no-op + } + + @Override + public void onAlterTable(AlterTableEvent ate) throws MetaException { + // no-op + } +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/AddPartitionMessage.java b/repl/src/java/org/apache/hive/repl/messaging/AddPartitionMessage.java new file mode 100644 index 0000000..c18b39a --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/AddPartitionMessage.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging; + +import java.util.List; +import java.util.Map; + +/** + * The message sent when partition(s) are added to a table. + */ +public abstract class AddPartitionMessage extends EventMessage { + + protected AddPartitionMessage() { + super(EventType.ADD_PARTITION); + } + + /** + * Getter for name of table (where partitions are added). + * @return Table-name (String). + */ + public abstract String getTable(); + + /** + * Getter for list of partitions added. + * @return List of maps, where each map identifies values for each partition-key, for every added partition. + */ + public abstract List> getPartitions(); + + // TODO : add in PartitionSpecs support after HIVE-7223 + + @Override + public EventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + if (getPartitions() == null) + throw new IllegalStateException("Partition-list unset."); + return super.checkValid(); + } +} + diff --git a/repl/src/java/org/apache/hive/repl/messaging/CreateDatabaseMessage.java b/repl/src/java/org/apache/hive/repl/messaging/CreateDatabaseMessage.java new file mode 100644 index 0000000..b4eebea --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/CreateDatabaseMessage.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging; + +/** + * Event Message sent when a Database is created in the metastore. + */ +public abstract class CreateDatabaseMessage extends EventMessage { + + protected CreateDatabaseMessage() { + super(EventType.CREATE_DATABASE); + } + +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/CreateTableMessage.java b/repl/src/java/org/apache/hive/repl/messaging/CreateTableMessage.java new file mode 100644 index 0000000..abdbc6a --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/CreateTableMessage.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging; + +/** + * Event message sent when a table is created in the metastore. + */ +public abstract class CreateTableMessage extends EventMessage { + + protected CreateTableMessage() { + super(EventType.CREATE_TABLE); + } + + /** + * Getter for the name of table created in the metastore. + * @return Table-name (String). + */ + public abstract String getTable(); + + @Override + public EventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + return super.checkValid(); + } +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/DropDatabaseMessage.java b/repl/src/java/org/apache/hive/repl/messaging/DropDatabaseMessage.java new file mode 100644 index 0000000..42f97e1 --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/DropDatabaseMessage.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging; + +/** + * Event message sent when a Database is dropped from the metastore. + */ +public abstract class DropDatabaseMessage extends EventMessage { + + protected DropDatabaseMessage() { + super(EventType.DROP_DATABASE); + } +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/DropPartitionMessage.java b/repl/src/java/org/apache/hive/repl/messaging/DropPartitionMessage.java new file mode 100644 index 0000000..9d08afd --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/DropPartitionMessage.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging; + +import java.util.List; +import java.util.Map; + +/** + * Event Message sent when a partition is dropped in the metastore. + */ +public abstract class DropPartitionMessage extends EventMessage { + + protected DropPartitionMessage() { + super(EventType.DROP_PARTITION); + } + + public abstract String getTable(); + public abstract List> getPartitions (); + + @Override + public EventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + if (getPartitions() == null) + throw new IllegalStateException("Partition-list unset."); + return super.checkValid(); + } +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/DropTableEventMessage.java b/repl/src/java/org/apache/hive/repl/messaging/DropTableEventMessage.java new file mode 100644 index 0000000..b599e13 --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/DropTableEventMessage.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging; + +/** + * Message sent when a Table is dropped in the metastore. + */ +public abstract class DropTableEventMessage extends EventMessage { + + protected DropTableEventMessage() { + super(EventType.DROP_TABLE); + } + + /** + * Getter for the name of the table being dropped. + * @return Table-name (String). + */ + public abstract String getTable(); + + @Override + public EventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + return super.checkValid(); + } +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/EventMessage.java b/repl/src/java/org/apache/hive/repl/messaging/EventMessage.java new file mode 100644 index 0000000..0cb74b0 --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/EventMessage.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging; + +import org.apache.hive.repl.common.Constants; + +/** + * Class representing messages emitted when Metastore operations are done. + * (E.g. Creation and deletion of databases, tables and partitions.) + */ +public abstract class EventMessage { + + /** + * Enumeration of all supported types of Metastore operations. + */ + public static enum EventType { + + CREATE_DATABASE(Constants.CREATE_DATABASE_EVENT), + DROP_DATABASE(Constants.DROP_DATABASE_EVENT), + CREATE_TABLE(Constants.CREATE_TABLE_EVENT), + DROP_TABLE(Constants.DROP_TABLE_EVENT), + ADD_PARTITION(Constants.ADD_PARTITION_EVENT), + DROP_PARTITION(Constants.DROP_PARTITION_EVENT); + + private String typeString; + + EventType(String typeString) { + this.typeString = typeString; + } + + @Override + public String toString() { return typeString; } + } + + protected EventType eventType; + + protected EventMessage(EventType eventType) { + this.eventType = eventType; + } + + public EventType getEventType() { + return eventType; + } + + /** + * Getter for Metastore's URL. + * (This is where the event originates from.) + * @return Metastore Server's URL (String). + */ + public abstract String getServer(); + + /** + * Getter for the Kerberos principal of the Metastore service. + * @return Metastore Service Principal (String). + */ + public abstract String getServicePrincipal(); + + /** + * Getter for the name of the Database on which the Metastore operation is done. + * @return Database-name (String). + */ + public abstract String getDB(); + + /** + * Getter for the timestamp associated with the operation. + * @return Timestamp (Long - seconds since epoch). + */ + public abstract Long getTimestamp(); + + /** + * Class invariant. Checked after construction or deserialization. + */ + public EventMessage checkValid() { + if (getServer() == null || getServicePrincipal() == null) + throw new IllegalStateException("Server-URL/Service-Principal shouldn't be null."); + if (getEventType() == null) + throw new IllegalStateException("Event-type unset."); + if (getDB() == null) + throw new IllegalArgumentException("DB-name unset."); + + return this; + } +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/MessageDeserializer.java b/repl/src/java/org/apache/hive/repl/messaging/MessageDeserializer.java new file mode 100644 index 0000000..0d1f457 --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/MessageDeserializer.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging; + +/** + * Interface for converting event messages from String-form back to EventMessage instances. + */ +public abstract class MessageDeserializer { + + /** + * Method to construct EventMessage from string. + */ + public EventMessage getEventMessage(String eventTypeString, String messageBody) { + System.err.println("checking eventTypeString["+eventTypeString+"], messageBody:\n\t"+messageBody); + try { + switch (EventMessage.EventType.valueOf(eventTypeString)) { + case CREATE_DATABASE: + return getCreateDatabaseMessage(messageBody); + case DROP_DATABASE: + return getDropDatabaseMessage(messageBody); + case CREATE_TABLE: + return getCreateTableMessage(messageBody); + case DROP_TABLE: + return getDropTableMessage(messageBody); + case ADD_PARTITION: + return getAddPartitionMessage(messageBody); + case DROP_PARTITION: + return getDropPartitionMessage(messageBody); + + default: + throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString); + } + } catch (IllegalArgumentException iae) { + iae.printStackTrace(System.err); + throw iae; + } + } + + /** + * Method to de-serialize CreateDatabaseMessage instance. + */ + public abstract CreateDatabaseMessage getCreateDatabaseMessage(String messageBody); + + /** + * Method to de-serialize DropDatabaseMessage instance. + */ + public abstract DropDatabaseMessage getDropDatabaseMessage(String messageBody); + + /** + * Method to de-serialize CreateTableMessage instance. + */ + public abstract CreateTableMessage getCreateTableMessage(String messageBody); + + /** + * Method to de-serialize DropTableMessage instance. + */ + public abstract DropTableEventMessage getDropTableMessage(String messageBody); + + /** + * Method to de-serialize AddPartitionMessage instance. + */ + public abstract AddPartitionMessage getAddPartitionMessage(String messageBody); + + /** + * Method to de-serialize DropPartitionMessage instance. + */ + public abstract DropPartitionMessage getDropPartitionMessage(String messageBody); + + // Protection against construction. + protected MessageDeserializer() {} +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/MessageFactory.java b/repl/src/java/org/apache/hive/repl/messaging/MessageFactory.java new file mode 100644 index 0000000..485b524 --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/MessageFactory.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hive.repl.messaging.json.JSONMessageFactory; + +import java.util.List; + +/** + * Abstract Factory for the construction of message instances. + */ +public abstract class MessageFactory { + + private static MessageFactory instance = new JSONMessageFactory(); + + protected static final HiveConf hiveConf = new HiveConf(); + static { + hiveConf.addResource("hive-site.xml"); + } + + private static final String CONF_LABEL_REPL_MESSAGE_FACTORY_IMPL_PREFIX = "repl.message.factory.impl."; + private static final String CONF_LABEL_REPL_MESSAGE_FORMAT = "repl.message.format"; + private static final String REPL_MESSAGE_FORMAT = hiveConf.get(CONF_LABEL_REPL_MESSAGE_FORMAT, "json"); + private static final String DEFAULT_MESSAGE_FACTORY_IMPL = "org.apache.hive.repl.messaging.json.JSONMessageFactory"; + private static final String REPL_MESSAGE_FACTORY_IMPL = hiveConf.get(CONF_LABEL_REPL_MESSAGE_FACTORY_IMPL_PREFIX + + REPL_MESSAGE_FORMAT, + DEFAULT_MESSAGE_FACTORY_IMPL); + + protected static final String SERVER_URL = hiveConf.get(HiveConf.ConfVars.METASTOREURIS.name(), ""); + protected static final String SERVICE_PRINCIPAL = hiveConf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.name(), ""); + + /** + * Getter for MessageFactory instance. + */ + public static MessageFactory getInstance() { + if (instance == null) { + instance = getInstance(REPL_MESSAGE_FACTORY_IMPL); + } + return instance; + } + + private static MessageFactory getInstance(String className) { + try { + return (MessageFactory)ReflectionUtils.newInstance(Class.forName(className), hiveConf); + } + catch (ClassNotFoundException classNotFound) { + throw new IllegalStateException("Could not construct MessageFactory implementation: ", classNotFound); + } + } + + /** + * Getter for MessageDeserializer, corresponding to the specified format and version. + * @param format Serialization format for notifications. + * @param version Version of serialization format (currently ignored.) + * @return MessageDeserializer. + */ + public static MessageDeserializer getDeserializer(String format, + String version) { + return getInstance(hiveConf.get(CONF_LABEL_REPL_MESSAGE_FACTORY_IMPL_PREFIX + format, + DEFAULT_MESSAGE_FACTORY_IMPL)).getDeserializer(); + } + + public abstract MessageDeserializer getDeserializer(); + + /** + * Getter for version-string, corresponding to all constructed messages. + */ + public abstract String getVersion(); + + /** + * Getter for message-format. + */ + public abstract String getMessageFormat(); + + /** + * Factory method for CreateDatabaseMessage. + * @param db The Database being added. + * @return CreateDatabaseMessage instance. + */ + public abstract CreateDatabaseMessage buildCreateDatabaseEventMessage(Database db); + + /** + * Factory method for DropDatabaseMessage. + * @param db The Database being dropped. + * @return DropDatabaseMessage instance. + */ + public abstract DropDatabaseMessage buildDropDatabaseEventMessage(Database db); + + /** + * Factory method for CreateTableMessage. + * @param table The Table being created. + * @return CreateTableMessage instance. + */ + public abstract CreateTableMessage buildCreateTableEventMessage(Table table); + + /** + * Factory method for DropTableEventMessage. + * @param table The Table being dropped. + * @return DropTableEventMessage instance. + */ + public abstract DropTableEventMessage buildDropTableEventMessage(Table table); + + /** + * Factory method for AddPartitionMessage. + * @param table The Table to which the partitions are added. + * @param partitions The set of Partitions being added. + * @return AddPartitionMessage instance. + */ + public abstract AddPartitionMessage buildAddPartitionEventMessage(Table table, List partitions); + + /** + * Factory method for DropPartitionMessage. + * @param table The Table from which the partition is dropped. + * @param partition The Partition being dropped. + * @return DropPartitionMessage instance. + */ + public abstract DropPartitionMessage buildDropPartitionEventMessage(Table table, Partition partition); +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/jms/MessagingUtils.java b/repl/src/java/org/apache/hive/repl/messaging/jms/MessagingUtils.java new file mode 100644 index 0000000..b05f1ed --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/jms/MessagingUtils.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging.jms; + +import org.apache.commons.lang.StringUtils; +import org.apache.hive.repl.common.Constants; +import org.apache.hive.repl.messaging.EventMessage; +import org.apache.hive.repl.messaging.MessageFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; + +/** + * Helper Utility to assist consumers of Messages in extracting + * message-content from JMS messages. + */ +public class MessagingUtils { + + /** + * Method to return EventMessage contained in the JMS message. + * @param message The JMS Message instance + * @return The contained EventMessage + */ + public static EventMessage getMessage(Message message) { + try { + String messageBody = ((TextMessage)message).getText(); + String eventType = message.getStringProperty(Constants.REPL_EVENT); + String messageVersion = message.getStringProperty(Constants.MESSAGE_VERSION); + String messageFormat = message.getStringProperty(Constants.MESSAGE_FORMAT); + + if (StringUtils.isEmpty(messageBody) || StringUtils.isEmpty(eventType)) + throw new IllegalArgumentException("Could not extract EventMessage. " + + "EventType and/or MessageBody is null/empty."); + + return MessageFactory.getDeserializer(messageFormat, messageVersion).getEventMessage(eventType, messageBody); + } + catch (JMSException exception) { + throw new IllegalArgumentException("Could not extract EventMessage. ", exception); + } + } + + // Prevent construction. + private MessagingUtils() {} +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/json/JSONAddPartitionMessage.java b/repl/src/java/org/apache/hive/repl/messaging/json/JSONAddPartitionMessage.java new file mode 100644 index 0000000..5db5d2b --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/json/JSONAddPartitionMessage.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging.json; + +import org.apache.hive.repl.messaging.AddPartitionMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.List; +import java.util.Map; + +/** + * JSON implementation of AddPartitionEventMessage. + */ +public class JSONAddPartitionMessage extends AddPartitionMessage { + + @JsonProperty + String server, servicePrincipal, db, table; + + @JsonProperty + Long timestamp; + + @JsonProperty + List> partitions; + + /** + * Default Constructor. Required for Jackson. + */ + public JSONAddPartitionMessage() {} + + public JSONAddPartitionMessage(String server, String servicePrincipal, String db, String table, + List> partitions, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.partitions = partitions; + this.timestamp = timestamp; + checkValid(); + } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public String getDB() { return db; } + + @Override + public String getTable() { return table; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public List> getPartitions () { return partitions; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/json/JSONCreateDatabaseMessage.java b/repl/src/java/org/apache/hive/repl/messaging/json/JSONCreateDatabaseMessage.java new file mode 100644 index 0000000..76099da --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/json/JSONCreateDatabaseMessage.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging.json; + +import org.apache.hive.repl.messaging.CreateDatabaseMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON Implementation of CreateDatabaseMessage. + */ +public class JSONCreateDatabaseMessage extends CreateDatabaseMessage { + + @JsonProperty + String server, servicePrincipal, db; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONCreateDatabaseMessage() {} + + public JSONCreateDatabaseMessage(String server, String servicePrincipal, String db, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.timestamp = timestamp; + checkValid(); + } + + @Override + public String getDB() { return db; } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } + +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/json/JSONCreateTableMessage.java b/repl/src/java/org/apache/hive/repl/messaging/json/JSONCreateTableMessage.java new file mode 100644 index 0000000..ae796dc --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/json/JSONCreateTableMessage.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging.json; + +import org.apache.hive.repl.messaging.CreateTableMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON implementation of CreateTableMessage. + */ +public class JSONCreateTableMessage extends CreateTableMessage { + + @JsonProperty + String server, servicePrincipal, db, table; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, needed for Jackson. + */ + public JSONCreateTableMessage() {} + + public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.timestamp = timestamp; + checkValid(); + } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public String getDB() { return db; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public String getTable() { return table; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/json/JSONDropDatabaseMessage.java b/repl/src/java/org/apache/hive/repl/messaging/json/JSONDropDatabaseMessage.java new file mode 100644 index 0000000..6485cefb --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/json/JSONDropDatabaseMessage.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging.json; + +import org.apache.hive.repl.messaging.DropDatabaseMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON implementation of DropDatabaseMessage. + */ +public class JSONDropDatabaseMessage extends DropDatabaseMessage { + + @JsonProperty + String server, servicePrincipal, db; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONDropDatabaseMessage() {} + + public JSONDropDatabaseMessage(String server, String servicePrincipal, String db, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.timestamp = timestamp; + checkValid(); + } + + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public String getDB() { return db; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/json/JSONDropPartitionMessage.java b/repl/src/java/org/apache/hive/repl/messaging/json/JSONDropPartitionMessage.java new file mode 100644 index 0000000..30f78d1 --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/json/JSONDropPartitionMessage.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging.json; + +import org.apache.hive.repl.messaging.DropPartitionMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.util.List; +import java.util.Map; + +/** + * JSON implementation of DropPartitionMessage. + */ +public class JSONDropPartitionMessage extends DropPartitionMessage { + + @JsonProperty + String server, servicePrincipal, db, table; + + @JsonProperty + Long timestamp; + + @JsonProperty + List> partitions; + + /** + * Default Constructor. Required for Jackson. + */ + public JSONDropPartitionMessage() {} + + public JSONDropPartitionMessage(String server, String servicePrincipal, String db, String table, + List> partitions, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.partitions = partitions; + this.timestamp = timestamp; + checkValid(); + } + + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public String getDB() { return db; } + + @Override + public String getTable() { return table; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public List> getPartitions () { return partitions; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/json/JSONDropTableEventMessage.java b/repl/src/java/org/apache/hive/repl/messaging/json/JSONDropTableEventMessage.java new file mode 100644 index 0000000..c388f58 --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/json/JSONDropTableEventMessage.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging.json; + +import org.apache.hive.repl.messaging.DropTableEventMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON implementation of DropTableEventMessage. + */ +public class JSONDropTableEventMessage extends DropTableEventMessage { + + @JsonProperty + String server, servicePrincipal, db, table; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, needed for Jackson. + */ + public JSONDropTableEventMessage() {} + + public JSONDropTableEventMessage(String server, String servicePrincipal, String db, String table, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = db; + this.table = table; + this.timestamp = timestamp; + checkValid(); + } + + + @Override + public String getTable() { return table; } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public String getDB() { return db; } + + @Override + public Long getTimestamp() { return timestamp; } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } + +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/json/JSONMessageDeserializer.java b/repl/src/java/org/apache/hive/repl/messaging/json/JSONMessageDeserializer.java new file mode 100644 index 0000000..e5ec854 --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/json/JSONMessageDeserializer.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging.json; + +import org.apache.hive.repl.messaging.AddPartitionMessage; +import org.apache.hive.repl.messaging.CreateDatabaseMessage; +import org.apache.hive.repl.messaging.CreateTableMessage; +import org.apache.hive.repl.messaging.DropDatabaseMessage; +import org.apache.hive.repl.messaging.DropPartitionMessage; +import org.apache.hive.repl.messaging.DropTableEventMessage; +import org.apache.hive.repl.messaging.MessageDeserializer; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * MessageDeserializer implementation, for deserializing from JSON strings. + */ +public class JSONMessageDeserializer extends MessageDeserializer { + + static ObjectMapper mapper = new ObjectMapper(); // Thread-safe. + + static { + mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @Override + public CreateDatabaseMessage getCreateDatabaseMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONCreateDatabaseMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONCreateDatabaseMessage.", exception); + } + } + + @Override + public DropDatabaseMessage getDropDatabaseMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropDatabaseMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONDropDatabaseMessage.", exception); + } + } + + @Override + public CreateTableMessage getCreateTableMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONCreateTableMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONCreateTableMessage.", exception); + } + } + + @Override + public DropTableEventMessage getDropTableMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropTableEventMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONDropTableMessage.", exception); + } + } + + @Override + public AddPartitionMessage getAddPartitionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAddPartitionMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct AddPartitionMessage.", exception); + } + } + + @Override + public DropPartitionMessage getDropPartitionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropPartitionMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct DropPartitionMessage.", exception); + } + } +} diff --git a/repl/src/java/org/apache/hive/repl/messaging/json/JSONMessageFactory.java b/repl/src/java/org/apache/hive/repl/messaging/json/JSONMessageFactory.java new file mode 100644 index 0000000..60746df --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/messaging/json/JSONMessageFactory.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hive.repl.messaging.AddPartitionMessage; +import org.apache.hive.repl.messaging.CreateDatabaseMessage; +import org.apache.hive.repl.messaging.CreateTableMessage; +import org.apache.hive.repl.messaging.DropDatabaseMessage; +import org.apache.hive.repl.messaging.DropPartitionMessage; +import org.apache.hive.repl.messaging.DropTableEventMessage; +import org.apache.hive.repl.messaging.MessageDeserializer; +import org.apache.hive.repl.messaging.MessageFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * The JSON implementation of the MessageFactory. Constructs JSON implementations of + * each message-type. + */ +public class JSONMessageFactory extends MessageFactory { + + private static JSONMessageDeserializer deserializer = new JSONMessageDeserializer(); + + @Override + public MessageDeserializer getDeserializer() { + return deserializer; + } + + @Override + public String getVersion() { + return "0.1"; + } + + @Override + public String getMessageFormat() { + return "json"; + } + + @Override + public CreateDatabaseMessage buildCreateDatabaseEventMessage(Database db) { + return new JSONCreateDatabaseMessage(SERVER_URL, SERVICE_PRINCIPAL, db.getName(), + System.currentTimeMillis() / 1000); + } + + @Override + public DropDatabaseMessage buildDropDatabaseEventMessage(Database db) { + return new JSONDropDatabaseMessage(SERVER_URL, SERVICE_PRINCIPAL, db.getName(), + System.currentTimeMillis() / 1000); + } + + @Override + public CreateTableMessage buildCreateTableEventMessage(Table table) { + return new JSONCreateTableMessage(SERVER_URL, SERVICE_PRINCIPAL, table.getDbName(), + table.getTableName(), System.currentTimeMillis()/1000); + } + + @Override + public DropTableEventMessage buildDropTableEventMessage(Table table) { + return new JSONDropTableEventMessage(SERVER_URL, SERVICE_PRINCIPAL, table.getDbName(), table.getTableName(), + System.currentTimeMillis()/1000); + } + + @Override + public AddPartitionMessage buildAddPartitionEventMessage(Table table, List partitions) { + return new JSONAddPartitionMessage(SERVER_URL, SERVICE_PRINCIPAL, table.getDbName(), + table.getTableName(), getPartitionKeyValues(table, partitions), System.currentTimeMillis()/1000); + } + + @Override + public DropPartitionMessage buildDropPartitionEventMessage(Table table, Partition partition) { + return new JSONDropPartitionMessage(SERVER_URL, SERVICE_PRINCIPAL, partition.getDbName(), + partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)), + System.currentTimeMillis()/1000); + } + + private static Map getPartitionKeyValues(Table table, Partition partition) { + Map partitionKeys = new LinkedHashMap(); + for (int i=0; i> getPartitionKeyValues(Table table, List partitions) { + List> partitionList = new ArrayList>(partitions.size()); + for (Partition partition : partitions) + partitionList.add(getPartitionKeyValues(table, partition)); + return partitionList; + } +} diff --git a/repl/src/java/org/apache/hive/repl/package-info.java b/repl/src/java/org/apache/hive/repl/package-info.java new file mode 100644 index 0000000..78352d1 --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Hive Replication support - This whole package should be considered @Evolving @Unstable + */ +package org.apache.hive.repl; \ No newline at end of file diff --git a/repl/src/test/org/apache/hive/repl/listener/TestMsgBusConnection.java b/repl/src/test/org/apache/hive/repl/listener/TestMsgBusConnection.java new file mode 100644 index 0000000..4471d90 --- /dev/null +++ b/repl/src/test/org/apache/hive/repl/listener/TestMsgBusConnection.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.listener; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.repl.common.Constants; +import org.apache.hive.repl.messaging.EventMessage; +import org.apache.hive.repl.messaging.jms.MessagingUtils; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.TextMessage; + +public class TestMsgBusConnection extends TestCase { + + private Driver driver; + private BrokerService broker; + private MessageConsumer consumer; + + @Override + protected void setUp() throws Exception { + + super.setUp(); + broker = new BrokerService(); + // configure the broker + broker.addConnector("tcp://localhost:61616?broker.persistent=false"); + + broker.start(); + + System.setProperty("java.naming.factory.initial", + "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.provider.url", "tcp://localhost:61616"); + connectClient(); + HiveConf hiveConf = new HiveConf(this.getClass()); + hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, + ReplListener.class.getName()); + hiveConf.set(ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(Constants.MSGBUS_TOPIC_PREFIX, "hnotif"); + SessionState.start(new CliSessionState(hiveConf)); + driver = new Driver(hiveConf); + } + + private void connectClient() throws JMSException { + ConnectionFactory connFac = new ActiveMQConnectionFactory( + "tcp://localhost:61616"); + Connection conn = connFac.createConnection(); + conn.start(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Destination topic = session.createTopic("hnotif"); + consumer = session.createConsumer(topic); + } + + public void testConnection() throws Exception { + + try { + driver.run("create database testconndb"); + Message msg = consumer.receive(); + assertTrue("Expected TextMessage", msg instanceof TextMessage); + assertEquals(Constants.CREATE_DATABASE_EVENT, + msg.getStringProperty(Constants.REPL_EVENT)); + assertEquals("topic://hnotif", msg.getJMSDestination().toString()); + EventMessage messageObject = MessagingUtils.getMessage(msg); + assertEquals("testconndb", messageObject.getDB()); + broker.stop(); + driver.run("drop database testconndb cascade"); + broker.start(true); + connectClient(); + driver.run("create database testconndb"); + msg = consumer.receive(); + assertEquals(Constants.CREATE_DATABASE_EVENT, + msg.getStringProperty(Constants.REPL_EVENT)); + assertEquals("topic://hnotif", msg.getJMSDestination().toString()); + assertEquals("testconndb", messageObject.getDB()); + driver.run("drop database testconndb cascade"); + msg = consumer.receive(); + assertEquals(Constants.DROP_DATABASE_EVENT, + msg.getStringProperty(Constants.REPL_EVENT)); + assertEquals("topic://hnotif", msg.getJMSDestination().toString()); + assertEquals("testconndb", messageObject.getDB()); + } catch (NoSuchObjectException nsoe) { + nsoe.printStackTrace(System.err); + assert false; + } catch (AlreadyExistsException aee) { + aee.printStackTrace(System.err); + assert false; + } + } +}