diff --git a/repl/src/java/org/apache/hive/repl/tasks/NotYetImplementedReplicationTask.java b/repl/src/java/org/apache/hive/repl/tasks/NotYetImplementedReplicationTask.java new file mode 100644 index 0000000..0e53fc0 --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/tasks/NotYetImplementedReplicationTask.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.repl.tasks; + +import org.apache.hive.repl.messaging.EventMessage; + +/** + * This class is there to help testing, and to help initial development + * and will be the default Replication Task for under-development replication + * tasks to override. + * + * This is not intended to be a permanent class, and will likely move to the test + * package after initial implementation. + */ + +public class NotYetImplementedReplicationTask extends ReplicationTask { + protected NotYetImplementedReplicationTask(EventMessage event) { + super(event); + } + + @Override + public boolean isActionable(){ + return false; + } + +} diff --git a/repl/src/java/org/apache/hive/repl/tasks/ReplicationTask.java b/repl/src/java/org/apache/hive/repl/tasks/ReplicationTask.java new file mode 100644 index 0000000..47ea04f --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/tasks/ReplicationTask.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.repl.tasks; + +import org.apache.hive.repl.messaging.EventMessage; + +import java.util.List; +import java.util.Map; + +/** + * ReplicationTask captures the concept of what it'd take to replicate changes from + * one warehouse to another given a notification event that captures what changed. + */ +public class ReplicationTask { + protected EventMessage event; + protected StagingDirectoryProvider srcStagingDirProvider = null; + protected StagingDirectoryProvider dstStagingDirProvider = null; + protected Map tableNameMapping = null; + protected Map dbNameMapping = null; + + /** + * Factory method to return appropriate subtype of ReplicationTask for given event + * @param event EventMessage returned by the notification subsystem + * @return corresponding ReplicationTask + */ + public static ReplicationTask create(EventMessage event) { + switch (event.getEventType()) { + case CREATE_DATABASE: + return new NotYetImplementedReplicationTask(event); + case DROP_DATABASE: + return new NotYetImplementedReplicationTask(event); + case CREATE_TABLE: + return new NotYetImplementedReplicationTask(event); + case DROP_TABLE: + return new NotYetImplementedReplicationTask(event); + case ADD_PARTITION: + return new NotYetImplementedReplicationTask(event); + case DROP_PARTITION: + return new NotYetImplementedReplicationTask(event); + default: + throw new IllegalStateException("Unrecognized Event type, no replication task available"); + } + } + + // Primary entry point is a factory method instead of ctor + // to allow for future ctor mutabulity in design + protected ReplicationTask(EventMessage event) { + this.event = event; + } + + /** + * Returns true if the replication task in question needs to create staging + * directories to complete its operation. This will mean that you will need + * to copy these directories over to the destination warehouse for each + * source-destination warehouse pair. + * If this is true, you will need to call .withSrcStagingDirProvider(...) + * and .withDstStagingDirProvider(...) before this ReplicationTask is usable + */ + public boolean needsStagingDirs(){ + return (this.event.getEventType().equals(EventMessage.EventType.ADD_PARTITION)); + } + + /** + * Returns true if this ReplicationTask is prepared with all info it needs, and is + * ready to be used + */ + public boolean isActionable(){ + if (! this.needsStagingDirs()) { + return true; + } + if ((srcStagingDirProvider != null) && (dstStagingDirProvider != null)){ + return true; + } + return false; + } + + /** + * See {@link org.apache.hive.repl.tasks.StagingDirectoryProvider} + * @param srcStagingDirProvider Staging Directory Provider for the source warehouse + * @return this + */ + public ReplicationTask withSrcStagingDirProvider(StagingDirectoryProvider srcStagingDirProvider){ + this.srcStagingDirProvider = srcStagingDirProvider; + return this; + } + + /** + * See {@link org.apache.hive.repl.tasks.StagingDirectoryProvider} + * @param dstStagingDirProvider Staging Directory Provider for the destination warehouse + * @return this replication task + */ + public ReplicationTask withDstStagingDirProvider(StagingDirectoryProvider dstStagingDirProvider){ + this.dstStagingDirProvider = dstStagingDirProvider; + return this; + } + + /** + * Allows a user to specify a table name mapping, where the keys are the names of the table + * in the source warehouse, and the values are the table names on the destination warehouse + * @param tableNameMapping + * @return this replication task + */ + public ReplicationTask withTableNameMapping(Map tableNameMapping){ + this.tableNameMapping = tableNameMapping; + return this; + } + + /** + * Allows a user to specify a db name mapping, where the keys are the names of the db + * in the source warehouse, and the values are the db names on the destination warehouse + * @param dbNameMapping + * @return this replication task + */ + public ReplicationTask withDbNameMapping(Map dbNameMapping){ + this.dbNameMapping = dbNameMapping; + return this; + } + + protected void verifyActionable() { + if (!this.isActionable()){ + throw new IllegalStateException("actionable command on task called when ReplicationTask is still not actionable."); + } + } + + /** + * Returns a list of commands to send to a hive driver on the source warehouse + * @return a list of commands to send to a hive driver on the source warehouse + */ + public List getSrcWhCommands() { + verifyActionable(); + return null; + } + + /** + * Returns a list of commands to send to a hive driver on the dest warehouse + * @return a list of commands to send to a hive driver on the dest warehouse + */ + public List getDstWhCommands() { + verifyActionable(); + return null; + } + + +} \ No newline at end of file diff --git a/repl/src/java/org/apache/hive/repl/tasks/StagingDirectoryProvider.java b/repl/src/java/org/apache/hive/repl/tasks/StagingDirectoryProvider.java new file mode 100644 index 0000000..16dd29a --- /dev/null +++ b/repl/src/java/org/apache/hive/repl/tasks/StagingDirectoryProvider.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.repl.tasks; + +/** + * Interface for a client to provide a Staging Directory specification + */ +public interface StagingDirectoryProvider { + + /** + * Return a temporary staging directory for a given key + * @param key key for the directory, usually a name of a partition + * Note that when overriding this method, no guarantees are made about the + * contents of the key, other than that is unique per partition. + * @return A parth specification to use as a temporary staging directory + */ + String getStagingDirectory(String key); + + /** + * Trivial implementation of this interface - creates + */ + public class TrivialImpl implements StagingDirectoryProvider { + + String prefix = null; + + /** + * Trivial implementation of StagingDirectoryProvider which takes a temporary directory + * and creates directories inside that for each key. + * @param base tmp directory inside which other tmp dirs are created + * @param separator path separator. Usually should be "/" + */ + public TrivialImpl(String base,String separator){ + this.prefix = base + separator; + } + + @Override + public String getStagingDirectory(String key) { + return prefix + key.replaceAll(" ","_"); + } + } +} \ No newline at end of file diff --git a/repl/src/test/org/apache/hive/repl/tasks/ReplicationScenarioBase.java b/repl/src/test/org/apache/hive/repl/tasks/ReplicationScenarioBase.java new file mode 100644 index 0000000..71a2158 --- /dev/null +++ b/repl/src/test/org/apache/hive/repl/tasks/ReplicationScenarioBase.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.repl.tasks; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.repl.common.Constants; +import org.apache.hive.repl.listener.ReplListener; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +// Base class to do a bunch of setup that allows replication tests to be written +public class ReplicationScenarioBase extends TestCase { + + public boolean setUpBroker = true; + // set to false if you want to use an external activemq broker (say, during dev) + // otherwise, set to true for unit test runs. + + public static final int DEFAULT_BROKER_PORT = 61616; + public static final long TIMEOUT = 8000; + protected static final String TOPIC_NAME = "hrepl"; + + protected Driver driver; + protected HiveMetaStoreClient client; + protected BrokerService broker; + protected MessageConsumer consumer; + protected Connection conn; + protected int port; + + @Override + protected void setUp() throws Exception { + + super.setUp(); + + if (setUpBroker) { + port = MetaStoreUtils.findFreePort(); + // configure the broker + broker = new BrokerService(); + broker.addConnector("tcp://localhost:"+port+"?broker.persistent=false"); + broker.start(); + } else { + port = DEFAULT_BROKER_PORT; + } + System.out.println("Using port["+port+"]"); + + + System.setProperty("java.naming.factory.initial", + "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.provider.url", "tcp://localhost:"+port); + connectClient(); + HiveConf hiveConf = new HiveConf(this.getClass()); + hiveConf.set(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname, + ReplListener.class.getName()); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(Constants.MSGBUS_TOPIC_PREFIX, TOPIC_NAME); + SessionState.start(new CliSessionState(hiveConf)); + driver = new Driver(hiveConf); + client = new HiveMetaStoreClient(hiveConf); + } + + @Override + protected void tearDown() throws Exception { + driver.close(); + client.close(); + conn.close(); + if (setUpBroker){ + broker.stop(); + } + } + + + private void connectClient() throws JMSException { + ConnectionFactory connFac = new ActiveMQConnectionFactory( + "tcp://localhost:"+port); + conn = connFac.createConnection(); + conn.start(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Destination topic = session.createTopic(TOPIC_NAME+".>"); + consumer = session.createConsumer(topic); + } + + /** + * Keep popping messages from the queue till we get a message of a certain type + * @param eventType The type of event we're interested in. + * @return The message. + * @throws JMSException + */ + protected Message readMessagesTill(String eventType) throws JMSException { + Message msg = readMessage(); + while ((!msg.getStringProperty(Constants.REPL_EVENT).equals(eventType))){ + System.err.println("We saw a message of type : " + msg.getStringProperty(Constants.REPL_EVENT) + ", ignoring."); + msg = readMessage(); // Keep reading next message till we get the event type we're interested in. + } + return msg; + } + + /** + * Rudimentary robust read message. + * Attempts to read a message semi-synchronously (blocking with timeout of 8 seconds) + * from the JMS queue with 5 retries if we have any issues during the read. Will throw + * a RuntimeException if we don't manage to read the message by then. + * @return The message read. + */ + protected Message readMessage(){ + int numTries = 5; + while (numTries > 0){ + try { + Message msg = consumer.receive(TIMEOUT); + if (msg != null){ + return msg; + } + System.err.print("."); + } catch (JMSException e) { + // ignore, retry. + } + numTries--; + } + throw new RuntimeException("Hit max retry limit, no messages received."); + } +}