diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index fb23c40..81938af 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1589,6 +1589,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "imported on to tables that are the target of replication. If this parameter is\n" + "set, regular imports will check if the destination table(if it exists) has a " + "'repl.last.id' set on it. If so, it will fail."), + HIVE_REPL_TASK_FACTORY("hive.repl.task.factory","", + "Parameter that can be used to override which ReplicationTaskFactory will be\n" + + "used to instantiate ReplicationTask events. Override for third party repl plugins"), HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false, ""), HIVE_REWORK_MAPREDWORK("hive.rework.mapredwork", false, "should rework the mapred work or not.\n" + diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 358a882..81cc8c4 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -187,7 +187,7 @@ public void onAlterPartition (AlterPartitionEvent partitionEvent) throws MetaEx Partition after = partitionEvent.getNewPartition(); NotificationEvent event = new NotificationEvent(0, now(), HCatConstants.HCAT_ALTER_PARTITION_EVENT, - msgFactory.buildAlterPartitionMessage(before, after).toString()); + msgFactory.buildAlterPartitionMessage(partitionEvent.getTable(),before, after).toString()); event.setDbName(before.getDbName()); event.setTableName(before.getTableName()); enqueue(event); @@ -223,7 +223,7 @@ public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException { public void onInsert(InsertEvent insertEvent) throws MetaException { NotificationEvent event = new NotificationEvent(0, now(), HCatConstants.HCAT_INSERT_EVENT, msgFactory.buildInsertMessage(insertEvent.getDb(), insertEvent.getTable(), - insertEvent.getPartitions(), insertEvent.getFiles()).toString()); + insertEvent.getPartitionKeyValues(), insertEvent.getFiles()).toString()); event.setDbName(insertEvent.getDb()); event.setTableName(insertEvent.getTable()); enqueue(event); diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java index 1718d79..da3d4da 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java @@ -55,11 +55,8 @@ import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; -import org.apache.hadoop.hive.metastore.events.InsertEvent; -import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.apache.hive.hcatalog.common.HCatConstants; -import org.apache.hive.hcatalog.messaging.AlterTableMessage; import org.apache.hive.hcatalog.messaging.HCatEventMessage; import org.apache.hive.hcatalog.messaging.MessageFactory; import org.slf4j.Logger; @@ -150,7 +147,7 @@ public void onAlterPartition(AlterPartitionEvent ape) throws MetaException { Partition after = ape.getNewPartition(); String topicName = getTopicName(ape.getTable()); - send(messageFactory.buildAlterPartitionMessage(before, after), topicName); + send(messageFactory.buildAlterPartitionMessage(ape.getTable(),before, after), topicName); } } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java index 7412b60..10a300d 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java @@ -18,7 +18,7 @@ */ package org.apache.hive.hcatalog.messaging; -import java.util.List; +import java.util.Map; /** * HCat message sent when a table is Altered. @@ -31,12 +31,12 @@ protected AlterPartitionMessage() { public abstract String getTable(); - public abstract List getValues(); + public abstract Map getKeyValues(); @Override public HCatEventMessage checkValid() { if (getTable() == null) throw new IllegalStateException("Table name unset."); - if (getValues() == null) throw new IllegalStateException("Partition values unset"); + if (getKeyValues() == null) throw new IllegalStateException("Partition values unset"); return super.checkValid(); } } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java index b25da29..be7ea10 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java @@ -20,6 +20,7 @@ package org.apache.hive.hcatalog.messaging; import java.util.List; +import java.util.Map; /** * HCat message sent when an insert is done to a table or partition. @@ -37,11 +38,11 @@ protected InsertMessage() { public abstract String getTable(); /** - * Get the list of partition values. Will be null if this insert is to a table and not a + * Get the map of partition keyvalues. Will be null if this insert is to a table and not a * partition. - * @return List of partition values, or null. + * @return Map of partition keyvalues, or null. */ - public abstract List getPartitionValues(); + public abstract Map getPartitionKeyValues(); /** * Get the list of files created as a result of this DML operation. May be null. diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java index 2b16745..aec80c1 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; +import java.util.Map; /** * Abstract Factory for the construction of HCatalog message instances. @@ -145,11 +146,12 @@ public static MessageDeserializer getDeserializer(String format, /** * Factory method for building AlterPartitionMessage + * @param table The table in which the partition is being altered * @param before The partition before it was altered * @param after The partition after it was altered * @return a new AlterPartitionMessage */ - public abstract AlterPartitionMessage buildAlterPartitionMessage(Partition before, + public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, Partition after); /** @@ -170,5 +172,5 @@ public abstract AlterPartitionMessage buildAlterPartitionMessage(Partition befor * @return instance of InsertMessage */ public abstract InsertMessage buildInsertMessage(String db, String table, - List partVals, List files); + Map partVals, List files); } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java index 1e2456d..4f1d104 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java @@ -40,21 +40,25 @@ Long timestamp; @JsonProperty - List values; + Map keyValues; + /** + * Default constructor, needed for Jackson. + */ + public JSONAlterPartitionMessage() {} public JSONAlterPartitionMessage(String server, String servicePrincipal, String db, String table, - List values, + Map keyValues, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; this.table = table; this.timestamp = timestamp; - this.values = values; + this.keyValues = keyValues; checkValid(); } @@ -85,8 +89,8 @@ public String getTable() { } @Override - public List getValues() { - return values; + public Map getKeyValues() { + return keyValues; } @Override diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java index 2848843..b057d4a 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java @@ -37,6 +37,11 @@ @JsonProperty Long timestamp; + /** + * Default constructor, needed for Jackson. + */ + public JSONAlterTableMessage() {} + public JSONAlterTableMessage(String server, String servicePrincipal, String db, diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java index f1554e3..8a4db15 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java @@ -19,11 +19,11 @@ package org.apache.hive.hcatalog.messaging.json; -import org.apache.hive.hcatalog.messaging.DropTableMessage; import org.apache.hive.hcatalog.messaging.InsertMessage; import org.codehaus.jackson.annotate.JsonProperty; import java.util.List; +import java.util.Map; /** * JSON implementation of DropTableMessage. @@ -37,7 +37,10 @@ Long timestamp; @JsonProperty - List partitionValues, files; + List files; + + @JsonProperty + Map partKeyVals; /** * Default constructor, needed for Jackson. @@ -45,13 +48,13 @@ public JSONInsertMessage() {} public JSONInsertMessage(String server, String servicePrincipal, String db, String table, - List partVals, List files, Long timestamp) { + Map partKeyVals, List files, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; this.table = table; this.timestamp = timestamp; - partitionValues = partVals; + this.partKeyVals = partKeyVals; this.files = files; checkValid(); } @@ -64,8 +67,8 @@ public JSONInsertMessage(String server, String servicePrincipal, String db, Stri public String getServer() { return server; } @Override - public List getPartitionValues() { - return partitionValues; + public Map getPartitionKeyValues() { + return partKeyVals; } @Override diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java index 06efb89..954cd3a 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java @@ -109,9 +109,9 @@ public AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partVals, + public InsertMessage buildInsertMessage(String db, String table, Map partKeyVals, List files) { - return new JSONInsertMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db, table, partVals, + return new JSONInsertMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db, table, partKeyVals, files, now()); } diff --git a/hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java b/hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java index bf61dcf..827031a 100644 --- a/hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java +++ b/hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java @@ -182,14 +182,14 @@ public void onMessage(Message msg) { AlterPartitionMessage message = deserializer.getAlterPartitionMessage(messageBody); Assert.assertEquals("mytbl", message.getTable()); Assert.assertEquals("mydb", message.getDB()); - Assert.assertEquals(1, message.getValues().size()); - Assert.assertEquals("2011", message.getValues().get(0)); + Assert.assertEquals(1, message.getKeyValues().size()); + Assert.assertTrue(message.getKeyValues().values().contains("2011")); HCatEventMessage message2 = MessagingUtils.getMessage(msg); Assert.assertTrue("Unexpected message-type.", message2 instanceof AlterPartitionMessage); Assert.assertEquals("mydb", message2.getDB()); Assert.assertEquals("mytbl", ((AlterPartitionMessage) message2).getTable()); - Assert.assertEquals(1, ((AlterPartitionMessage) message2).getValues().size()); - Assert.assertEquals("2011", ((AlterPartitionMessage) message2).getValues().get(0)); + Assert.assertEquals(1, ((AlterPartitionMessage) message2).getKeyValues().size()); + Assert.assertTrue(((AlterPartitionMessage) message2).getKeyValues().values().contains("2011")); } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java index f761347..e9ccb13 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java @@ -76,6 +76,19 @@ public static HCatClient create(Configuration conf) throws HCatException { abstract void initialize(Configuration conf) throws HCatException; /** + * Fetch configuration value on conf that the HCatClient is instantiated + * against. We do not want to expose the conf itself via a getConf(), because + * we do not want it modifiable after instantiation of the HCatClient, but + * modules that get called from HCatClient often need to know about how + * HCatClient is configured, so we want a read-only interface for it. + * + * @param key keyname to look up + * @param defaultVal default value to furnish in case the key does not exist + * @return value for given key, and defaultVal if key is not present in conf + */ + public abstract String getConfVal(String key, String defaultVal); + + /** * Get all existing databases that match the given * pattern. The matching occurs as per Java regular expressions * diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java index 64375bc..3b2cd38 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java @@ -826,6 +826,11 @@ void initialize(Configuration conf) throws HCatException { } + @Override + public String getConfVal(String key, String defaultVal) { + return hiveConfig.get(key,defaultVal); + } + private Table getHiveTableLike(String dbName, String existingTblName, String newTableName, boolean isExternal, String location) throws HCatException { diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java index 2f830fd..0cc6117 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java @@ -34,7 +34,7 @@ public enum Scope { DB, TABLE, UNKNOWN }; - HCatNotificationEvent(NotificationEvent event) { + public HCatNotificationEvent(NotificationEvent event) { eventId = event.getEventId(); eventTime = event.getEventTime(); eventType = event.getEventType(); diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartition.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartition.java index 0d714ff..ed40af7 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartition.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartition.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.api; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -259,6 +260,17 @@ public String getSerDe() { return this.sd.getSerdeInfo().getParameters(); } + public HCatPartition parameters(Map parameters){ + if (this.parameters == null){ + this.parameters = new HashMap(); + } + if (!this.parameters.equals(parameters)) { + this.parameters.clear(); + this.parameters.putAll(parameters); + } + return this; + } + public Map getParameters() { return this.parameters; } diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/HCatReplicationTaskIterator.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/HCatReplicationTaskIterator.java index 3b8e2ae..ed5ddbe 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/HCatReplicationTaskIterator.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/HCatReplicationTaskIterator.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.api.repl; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hive.hcatalog.api.HCatClient; @@ -26,9 +27,9 @@ import org.apache.hive.hcatalog.common.HCatException; import java.util.Iterator; +import java.util.List; public class HCatReplicationTaskIterator implements Iterator{ - private Iterator notifIter = null; private class HCatReplicationTaskIteratorNotificationFilter implements IMetaStoreClient.NotificationFilter { @@ -60,10 +61,24 @@ public boolean accept(NotificationEvent event) { } } + private HCatClient hcatClient; + private IMetaStoreClient.NotificationFilter filter; + private int maxEvents; + + private int batchSize; + + private Iterator batchIter = null; + private List batch = null; + private long pos; + private long maxPos; + private int eventCount; + public HCatReplicationTaskIterator( HCatClient hcatClient, long eventFrom, int maxEvents, String dbName, String tableName) throws HCatException { - init(hcatClient,eventFrom,maxEvents, new HCatReplicationTaskIteratorNotificationFilter(dbName,tableName)); + // using init(..) instead of this(..) because the new HCatReplicationTaskIteratorNotificationFilter + // is an operation that needs to run before delegating to the other ctor, and this messes up chaining + // ctors } public HCatReplicationTaskIterator( @@ -71,19 +86,73 @@ public HCatReplicationTaskIterator( IMetaStoreClient.NotificationFilter filter) throws HCatException{ init(hcatClient,eventFrom,maxEvents,filter); } - private void init(HCatClient hcatClient, long eventFrom, int maxEvents, IMetaStoreClient.NotificationFilter filter) throws HCatException { + + private void init( + HCatClient hcatClient, long eventFrom, int maxEvents, + IMetaStoreClient.NotificationFilter filter) throws HCatException { // Simple implementation for now, this will later expand to do DAG evaluation. - this.notifIter = hcatClient.getNextNotification(eventFrom, maxEvents,filter).iterator(); + this.hcatClient = hcatClient; + this.filter = filter; + this.pos = eventFrom; + if (maxEvents < 1){ + // 0 or -1 implies fetch everything + this.maxEvents = Integer.MAX_VALUE; + } else { + this.maxEvents = maxEvents; + } + batchSize = Integer.parseInt( + hcatClient.getConfVal(HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX.varname,"50")); + this.eventCount = 0; + this.maxPos = hcatClient.getCurrentNotificationEventId(); + } + + private void fetchNextBatch() throws HCatException { + batch = hcatClient.getNextNotification(pos, batchSize, filter); + batchIter = batch.iterator(); + if (batch.isEmpty()){ + pos += batchSize; + if (pos < maxPos){ + fetchNextBatch(); + // This way, the only way the recursive stack of fetchNextBatch returns is if: + // a) We got a nonempty result, and we can consume + // b) We reached the end of the queue, and there are no more events. + // So, when we return from the fetchNextBatch() stack, if we have no more + // results in batch, we're done. + } + } } @Override public boolean hasNext() { - return notifIter.hasNext(); + if (eventCount >= maxEvents){ + // If we've already satisfied the number of events we were supposed to deliver, we end it. + return false; + } + if ((batchIter != null) && (batchIter.hasNext())){ + // If we have a valid batchIter and it has more elements, return them. + return true; + } + // If we're here, we want more events, and either batchIter is null, or batchIter + // has reached the end of the current batch. Let's fetch the next batch. + try { + fetchNextBatch(); + } catch (HCatException e) { + // Regrettable that we have to wrap the HCatException into a RuntimeException, + // but throwing the exception is the appropriate result here, and hasNext() + // signature will only allow RuntimeExceptions. Iterator.hasNext() really + // should have allowed IOExceptions + throw new RuntimeException(e); + } + // New batch has been fetched. If it's not empty, we have more elements to process. + return !batch.isEmpty(); } @Override public ReplicationTask next() { - return ReplicationTask.create(notifIter.next()); + eventCount++; + HCatNotificationEvent ev = batchIter.next(); + pos = ev.getEventId(); + return ReplicationTask.create(hcatClient,ev); } @Override diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java index 00f6d4e..bb9ff15 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java @@ -26,12 +26,11 @@ import java.util.List; /** - * This class is there to help testing, and to help initial development - * and will be the default Replication Task for under-development replication - * tasks to override. + * Noop replication task - a replication task that is actionable, + * does not need any further info, and returns NoopCommands. * - * This is not intended to be a permanent class, and will likely move to the test - * package after initial implementation. + * Useful for testing, and also for tasks that need to be represented + * but actually do nothing. */ public class NoopReplicationTask extends ReplicationTask { diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java index 4bd3e81..e73cc0c 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java @@ -19,6 +19,8 @@ package org.apache.hive.hcatalog.api.repl; import com.google.common.base.Function; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.hcatalog.api.HCatClient; import org.apache.hive.hcatalog.api.HCatNotificationEvent; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.messaging.MessageFactory; @@ -36,9 +38,11 @@ protected Function dbNameMapping = null; protected static MessageFactory messageFactory = MessageFactory.getInstance(); + private static Factory factoryInstance = null; + private static String factoryClassName = null; public interface Factory { - public ReplicationTask create(HCatNotificationEvent event); + public ReplicationTask create(HCatClient client, HCatNotificationEvent event); } /** @@ -48,7 +52,7 @@ */ public static class NoopFactory implements Factory { @Override - public ReplicationTask create(HCatNotificationEvent event) { + public ReplicationTask create(HCatClient client, HCatNotificationEvent event) { // TODO : Java 1.7+ support using String with switches, but IDEs don't all seem to know that. // If casing is fine for now. But we should eventually remove this. Also, I didn't want to // create another enum just for this. @@ -77,29 +81,71 @@ public ReplicationTask create(HCatNotificationEvent event) { } } - private static Factory factoryInstance = null; - private static Factory getFactoryInstance() { + private static Factory getFactoryInstance(HCatClient client) { if (factoryInstance == null){ - // TODO: Eventually, we'll have a bit here that looks at a config param to instantiate - // the appropriate factory, with EXIMFactory being the default - that allows - // others to implement their own ReplicationTask.Factory for other replication - // implementations. - // That addition will be brought in by the EXIMFactory patch. - factoryInstance = new NoopFactory(); + createFactoryInstance(client); } return factoryInstance; } /** + * Create factory instance for instantiating ReplicationTasks. + * + * The order precedence is as follows: + * + * a) If a factory has already been instantiated, and is valid, use it. + * b) If a factoryClassName has been provided, through .resetFactory(), attempt to instantiate that. + * Throw an exception if instantiation fails. (This is useful for testing) + * c) If a hive.repl.task.factory has been set in the default hive conf, use that. Throw an + * exception if instantiation fails. + * d) Default to NoopFactory. + */ + private synchronized static void createFactoryInstance(HCatClient client) { + if (factoryInstance == null){ + // instantiate new factory instance only if current one is not valid. + if (factoryClassName == null){ + // figure out which factory we're instantiating from HiveConf iff it's not been set on us directly. + factoryClassName = client.getConfVal(HiveConf.ConfVars.HIVE_REPL_TASK_FACTORY.varname,""); + } + if ((factoryClassName != null) && (!factoryClassName.isEmpty())){ + try { + Class factoryClass = (Class) Class.forName(factoryClassName); + factoryInstance = factoryClass.newInstance(); + } catch (Exception e) { + factoryClassName = null; // reset the classname for future evaluations. + throw new RuntimeException("Error instantiating ReplicationTask.Factory " + + HiveConf.ConfVars.HIVE_REPL_TASK_FACTORY.varname+"="+factoryClassName); + } + } else { + // default to NoopFactory. + factoryInstance = new NoopFactory(); + } + } + } + + /** + * Package scoped method used for testing - allows resetting the ReplicationTaskFactory used + * @param factoryClass The new ReplicationTaskFactory to use. + */ + public static void resetFactory(Class factoryClass) { + if (factoryClass != null){ + factoryClassName = factoryClass.getName(); + } else { + factoryClassName = null; + } + factoryInstance = null; + } + + /** * Factory method to return appropriate subtype of ReplicationTask for given event * @param event HCatEventMessage returned by the notification subsystem * @return corresponding ReplicationTask */ - public static ReplicationTask create(HCatNotificationEvent event){ + public static ReplicationTask create(HCatClient client, HCatNotificationEvent event){ if (event == null){ throw new IllegalArgumentException("event should not be null"); } - return getFactoryInstance().create(event); + return getFactoryInstance(client).create(client,event); } // Primary entry point is a factory method instead of ctor @@ -168,7 +214,7 @@ public ReplicationTask withDstStagingDirProvider(StagingDirectoryProvider dstSta * That way, the default will then be that the destination db name is the same as the src db name * * If you want to use a Map mapping instead of a Function, - * simply call this function as .withTableNameMapping(com.google.common.base.Functions.forMap(tableMap)) + * simply call this function as .withTableNameMapping(ReplicationUtils.mapBasedFunction(tableMap)) * @param tableNameMapping * @return this replication task */ @@ -185,7 +231,7 @@ public ReplicationTask withTableNameMapping(Function tableNameMap * That way, the default will then be that the destination db name is the same as the src db name * * If you want to use a Map mapping instead of a Function, - * simply call this function as .withDb(com.google.common.base.Functions.forMap(dbMap)) + * simply call this function as .withDbNameMapping(ReplicationUtils.mapBasedFunction(dbMap)) * @param dbNameMapping * @return this replication task */ diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java index 299a25d..1e7901d 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java @@ -41,7 +41,7 @@ public class ReplicationUtils { - private final static String REPL_STATE_ID = ReplicationSpec.KEY.CURR_STATE_ID.toString(); + public final static String REPL_STATE_ID = ReplicationSpec.KEY.CURR_STATE_ID.toString(); private ReplicationUtils(){ // dummy private constructor, since this class is a collection of static utility methods. @@ -155,13 +155,32 @@ public static String toStringWordCharsOnly(String s){ } /** + * Utility function to use in conjunction with .withDbNameMapping / .withTableNameMapping, + * if we desire usage of a Map instead of implementing a Function + */ + Function mapBasedFunction(final Map m){ + return new Function(){ + + @Nullable + @Override + public String apply(@Nullable String s) { + if ((m == null) || (!m.containsKey(s))){ + return s; + } + return m.get(s); + } + }; + } + + /** * Return a mapping from a given map function if available, and the key itself if not. */ public static String mapIfMapAvailable(String s, Function mapping){ try { return mapping.apply(s); } catch (IllegalArgumentException iae){ - // The key wasn't present in the mapping, return the key itself, since no mapping was available + // The key wasn't present in the mapping, and the function didn't take care of returning + // a default value. We return the key itself, since no mapping was available return s; } } diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropDatabaseCommand.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropDatabaseCommand.java new file mode 100644 index 0000000..fcdf834 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropDatabaseCommand.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.commands; + +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.data.ReaderWriter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +public class DropDatabaseCommand implements Command { + private String dbName = null; + private long eventId; + + public DropDatabaseCommand(String dbName, long eventId) { + this.dbName = dbName; + this.eventId = eventId; + } + + /** + * Trivial ctor to support Writable reflections instantiation + * do not expect to use this object as-is, unless you call + * readFields after using this ctor + */ + public DropDatabaseCommand(){ + } + + @Override + public List get() { + // DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT|CASCADE]; + StringBuilder sb = new StringBuilder(); + sb.append("DROP DATABASE IF EXISTS "); + sb.append(dbName); + sb.append(" CASCADE"); + return Collections.singletonList(sb.toString()); + } + + @Override + public boolean isRetriable() { + return true; + } + + @Override + public boolean isUndoable() { + return false; + } + + @Override + public List getUndo() { + throw new UnsupportedOperationException("getUndo called on command that returned false for isUndoable"); + } + + @Override + public List cleanupLocationsPerRetry() { + return Collections.emptyList(); + } + + @Override + public List cleanupLocationsAfterEvent() { + return Collections.emptyList(); + } + + @Override + public long getEventId() { + return eventId; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + ReaderWriter.writeDatum(dataOutput, dbName); + ReaderWriter.writeDatum(dataOutput, eventId); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + dbName = (String)ReaderWriter.readDatum(dataInput); + eventId = (Long) ReaderWriter.readDatum(dataInput); + } + +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropPartitionCommand.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropPartitionCommand.java new file mode 100644 index 0000000..7fcff87 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropPartitionCommand.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.commands; + +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.data.ReaderWriter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class DropPartitionCommand implements Command { + private long eventId; + private String dbName; + private String tableName; + private Map ptnDesc; + private boolean isReplicatedEvent = false; + + public DropPartitionCommand(String dbName, String tableName, Map ptnDesc, boolean isReplicatedEvent, long eventId) { + this.dbName = dbName; + this.tableName = tableName; + this.ptnDesc = ptnDesc; + this.isReplicatedEvent = isReplicatedEvent; + this.eventId = eventId; + } + + /** + * Trivial ctor to support Writable reflections instantiation + * do not expect to use this object as-is, unless you call + * readFields after using this ctor + */ + public DropPartitionCommand(){ + } + + @Override + public List get() { + // ALTER TABLE table_name DROP [IF EXISTS] PARTITION partition_spec, PARTITION partition_spec,...; + StringBuilder sb = new StringBuilder(); + sb.append("ALTER TABLE "); + sb.append(dbName); + sb.append('.'); + sb.append(tableName); + sb.append(" DROP IF EXISTS"); + sb.append(ReplicationUtils.partitionDescriptor(ptnDesc)); + if (isReplicatedEvent){ + sb.append(" FOR REPLICATION(\'"); + sb.append(eventId); + sb.append("\')"); + } + return Collections.singletonList(sb.toString()); + } + + @Override + public boolean isRetriable() { + return true; + } + + @Override + public boolean isUndoable() { + return false; + } + + @Override + public List getUndo() { + throw new UnsupportedOperationException("getUndo called on command that does returned false for isUndoable"); + } + + @Override + public List cleanupLocationsPerRetry() { + return Collections.emptyList(); + } + + @Override + public List cleanupLocationsAfterEvent() { + return Collections.emptyList(); + } + + @Override + public long getEventId() { + return eventId; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + ReaderWriter.writeDatum(dataOutput, dbName); + ReaderWriter.writeDatum(dataOutput, tableName); + ReaderWriter.writeDatum(dataOutput, ptnDesc); + ReaderWriter.writeDatum(dataOutput, isReplicatedEvent); + ReaderWriter.writeDatum(dataOutput, eventId); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + dbName = (String)ReaderWriter.readDatum(dataInput); + tableName = (String)ReaderWriter.readDatum(dataInput); + ptnDesc = (Map)ReaderWriter.readDatum(dataInput); + isReplicatedEvent = (Boolean) ReaderWriter.readDatum(dataInput); + eventId = (Long) ReaderWriter.readDatum(dataInput); + } + +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropTableCommand.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropTableCommand.java new file mode 100644 index 0000000..c31c963 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropTableCommand.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.commands; + +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.data.ReaderWriter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +public class DropTableCommand implements Command { + private long eventId; + private String dbName = null; + private String tableName = null; + private boolean isReplicatedEvent = false; + + public DropTableCommand(String dbName, String tableName, boolean isReplicatedEvent, long eventId) { + this.dbName = dbName; + this.tableName = tableName; + this.isReplicatedEvent = isReplicatedEvent; + this.eventId = eventId; + } + + /** + * Trivial ctor to support Writable reflections instantiation + * do not expect to use this object as-is, unless you call + * readFields after using this ctor + */ + public DropTableCommand() { + } + + @Override + public List get() { + // DROP TABLE [IF EXISTS] table_name; + StringBuilder sb = new StringBuilder(); + sb.append("DROP TABLE IF EXISTS "); + sb.append(dbName); + sb.append('.'); + sb.append(tableName); // TODO: Handle quoted tablenames + if (isReplicatedEvent){ + sb.append(" FOR REPLICATION(\'"); + sb.append(eventId); + sb.append("\')"); + } + return Collections.singletonList(sb.toString()); + } + + @Override + public boolean isRetriable() { + return true; + } + + @Override + public boolean isUndoable() { + return false; + } + + @Override + public List getUndo() { + throw new UnsupportedOperationException("getUndo called on command that does returned false for isUndoable"); + } + + @Override + public List cleanupLocationsPerRetry() { + return Collections.emptyList(); + } + + @Override + public List cleanupLocationsAfterEvent() { + return Collections.emptyList(); + } + + @Override + public long getEventId() { + return eventId; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + ReaderWriter.writeDatum(dataOutput, dbName); + ReaderWriter.writeDatum(dataOutput, tableName); + ReaderWriter.writeDatum(dataOutput, isReplicatedEvent); + ReaderWriter.writeDatum(dataOutput, eventId); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + dbName = (String)ReaderWriter.readDatum(dataInput); + tableName = (String)ReaderWriter.readDatum(dataInput); + isReplicatedEvent = (Boolean) ReaderWriter.readDatum(dataInput); + eventId = (Long) ReaderWriter.readDatum(dataInput); + } + +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/ExportCommand.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/ExportCommand.java new file mode 100644 index 0000000..5d6e657 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/ExportCommand.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.commands; + +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.data.ReaderWriter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class ExportCommand implements Command { + private String exportLocation; + private String dbName = null; + private String tableName = null; + private Map ptnDesc = null; + private long eventId; + private boolean isMetadataOnly = false; + + public ExportCommand(String dbName, String tableName, Map ptnDesc, + String exportLocation, boolean isMetadataOnly, long eventId) { + this.dbName = dbName; + this.tableName = tableName; + this.ptnDesc = ptnDesc; + this.exportLocation = exportLocation; + this.isMetadataOnly = isMetadataOnly; + this.eventId = eventId; + } + + /** + * Trivial ctor to support Writable reflections instantiation + * do not expect to use this object as-is, unless you call + * readFields after using this ctor + */ + public ExportCommand(){ + } + + @Override + public List get() { + // EXPORT TABLE tablename [PARTITION (part_column="value"[, ...])] + // TO 'export_target_path' + StringBuilder sb = new StringBuilder(); + sb.append("EXPORT TABLE "); + sb.append(dbName); + sb.append("."); + sb.append(tableName); // TODO: Handle quoted tablenames + sb.append(ReplicationUtils.partitionDescriptor(ptnDesc)); + sb.append(" TO '"); + sb.append(exportLocation); + sb.append("\' FOR "); + if (isMetadataOnly){ + sb.append("METADATA "); + } + sb.append("REPLICATION(\'"); + sb.append(eventId); + sb.append("\')"); + return Collections.singletonList(sb.toString()); + } + + @Override + public boolean isRetriable() { + return true; // Export is trivially retriable (after clearing out the staging dir provided.) + } + + @Override + public boolean isUndoable() { + return true; // Export is trivially undoable - in that nothing needs doing to undo it. + } + + @Override + public List getUndo() { + return Collections.emptyList(); + } + + @Override + public List cleanupLocationsPerRetry() { + return Collections.singletonList(exportLocation); + } + + @Override + public List cleanupLocationsAfterEvent() { + return Collections.singletonList(exportLocation); + } + + @Override + public long getEventId() { + return eventId; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + ReaderWriter.writeDatum(dataOutput, dbName); + ReaderWriter.writeDatum(dataOutput, tableName); + ReaderWriter.writeDatum(dataOutput, ptnDesc); + ReaderWriter.writeDatum(dataOutput, exportLocation); + ReaderWriter.writeDatum(dataOutput, isMetadataOnly); + ReaderWriter.writeDatum(dataOutput, eventId); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + dbName = (String)ReaderWriter.readDatum(dataInput); + tableName = (String)ReaderWriter.readDatum(dataInput); + ptnDesc = (Map)ReaderWriter.readDatum(dataInput); + exportLocation = (String)ReaderWriter.readDatum(dataInput); + isMetadataOnly = (Boolean) ReaderWriter.readDatum(dataInput); + eventId = (Long) ReaderWriter.readDatum(dataInput); + } + +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/ImportCommand.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/ImportCommand.java new file mode 100644 index 0000000..80a0afe --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/ImportCommand.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.commands; + +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.data.ReaderWriter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class ImportCommand implements Command { + private String importLocation; + private String dbName = null; + private String tableName = null; + private Map ptnDesc = null; + private long eventId; + private boolean isDefinitionOnly = false; + + // NOTE: The current implementation does not allow importing to an "EXTERNAL" location. + // This is intentional, since we want the destination tables to be "managed" tables. + // If this assumption should change at some point in the future, ImportSemanticAnalyzer + // will need some of its checks changed to allow for "replacing" external tables. + + public ImportCommand(String dbName, String tableName, Map ptnDesc, + String importLocation, boolean isDefinitionOnly, long eventId) { + this.dbName = dbName; + this.tableName = tableName; + this.ptnDesc = ptnDesc; + this.importLocation = importLocation; + this.isDefinitionOnly = isDefinitionOnly; + this.eventId = eventId; + } + + /** + * Trivial ctor to support Writable reflections instantiation + * do not expect to use this object as-is, unless you call + * readFields after using this ctor + */ + public ImportCommand(){ + } + + @Override + public List get() { + // IMPORT [[EXTERNAL] TABLE new_or_original_tablename [PARTITION (part_column="value"[, ...])]] + // FROM 'source_path' + // [LOCATION 'import_target_path'] + StringBuilder sb = new StringBuilder(); + sb.append("IMPORT TABLE "); + sb.append(dbName); + sb.append('.'); + sb.append(tableName); // TODO: Handle quoted tablenames + sb.append(ReplicationUtils.partitionDescriptor(ptnDesc)); + sb.append(" FROM '"); + sb.append(importLocation); + sb.append('\''); + return Collections.singletonList(sb.toString()); + } + + @Override + public boolean isRetriable() { + return true; + // Repl imports are replace-imports, and thus, are idempotent. + // Note that this assumes that this ImportCommand is running on an export dump + // created using EXPORT ... FOR REPLICATION. If the scope of ImportCommand + // were to eventually expand to importing dumps created by regular exports, + // then this needs updating. + } + + @Override + public boolean isUndoable() { + // Alters and replacements are not undoable if they've taken effect already. They are retriable though. + // Creates are undoable, but we cannot differentiate between creates, alters and replacements from a Command level. + return false; + } + + @Override + public List getUndo() { + throw new UnsupportedOperationException("Attempted to getUndo() on a repl import that does not support undo."); + } + + @Override + public List cleanupLocationsPerRetry() { + return Collections.emptyList(); + } + + @Override + public List cleanupLocationsAfterEvent() { + return Collections.singletonList(importLocation); + } + + @Override + public long getEventId() { + return eventId; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + ReaderWriter.writeDatum(dataOutput, dbName); + ReaderWriter.writeDatum(dataOutput, tableName); + ReaderWriter.writeDatum(dataOutput, ptnDesc); + ReaderWriter.writeDatum(dataOutput, importLocation); + ReaderWriter.writeDatum(dataOutput, isDefinitionOnly); + ReaderWriter.writeDatum(dataOutput, eventId); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + dbName = (String)ReaderWriter.readDatum(dataInput); + tableName = (String)ReaderWriter.readDatum(dataInput); + ptnDesc = (Map)ReaderWriter.readDatum(dataInput); + importLocation = (String)ReaderWriter.readDatum(dataInput); + isDefinitionOnly = (Boolean) ReaderWriter.readDatum(dataInput); + eventId = (Long) ReaderWriter.readDatum(dataInput); + } + +} + diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/NoopCommand.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/NoopCommand.java index 348b391..59a5c10 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/NoopCommand.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/NoopCommand.java @@ -26,7 +26,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -40,10 +40,12 @@ public class NoopCommand implements Command { private long eventId; + /** + * Trivial ctor to support Writable reflections instantiation + * do not expect to use this object as-is, unless you call + * readFields after using this ctor + */ public NoopCommand(){ - // trivial ctor to support Writable reflections instantiation - // do not expect to use this object as-is, unless you call - // readFields after using this ctor } public NoopCommand(long eventId){ @@ -52,7 +54,7 @@ public NoopCommand(long eventId){ @Override public List get() { - return new ArrayList(); + return Collections.emptyList(); } @Override @@ -67,17 +69,17 @@ public boolean isUndoable() { @Override public List getUndo() { - return new ArrayList(); + return Collections.emptyList(); } @Override public List cleanupLocationsPerRetry() { - return new ArrayList(); + return Collections.emptyList(); } @Override public List cleanupLocationsAfterEvent() { - return new ArrayList(); + return Collections.emptyList(); } @Override @@ -87,12 +89,12 @@ public long getEventId() { @Override public void write(DataOutput dataOutput) throws IOException { - ReaderWriter.writeDatum(dataOutput, Long.valueOf(eventId)); + ReaderWriter.writeDatum(dataOutput, eventId); } @Override public void readFields(DataInput dataInput) throws IOException { - eventId = ((Long)ReaderWriter.readDatum(dataInput)).longValue(); + eventId = (Long) ReaderWriter.readDatum(dataInput); } } diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AddPartitionReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AddPartitionReplicationTask.java new file mode 100644 index 0000000..b721fd2 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AddPartitionReplicationTask.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.exim; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.ExportCommand; +import org.apache.hive.hcatalog.api.repl.commands.ImportCommand; +import org.apache.hive.hcatalog.api.repl.commands.NoopCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.AddPartitionMessage; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Map; + +public class AddPartitionReplicationTask extends ReplicationTask { + + AddPartitionMessage addPartitionMessage = null; + + public AddPartitionReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event,HCatConstants.HCAT_ADD_PARTITION_EVENT); + addPartitionMessage = messageFactory.getDeserializer().getAddPartitionMessage(event.getMessage()); + } + + public boolean needsStagingDirs(){ + // we need staging directories as long as a single partition needed addition + return (!addPartitionMessage.getPartitions().isEmpty()); + } + + + public Iterable getSrcWhCommands() { + verifyActionable(); + if (addPartitionMessage.getPartitions().isEmpty()){ + return Collections.singletonList(new NoopCommand(event.getEventId())); + } + + return Iterables.transform(addPartitionMessage.getPartitions(), new Function, Command>(){ + @Override + public Command apply(@Nullable Map ptnDesc) { + return new ExportCommand( + addPartitionMessage.getDB(), + addPartitionMessage.getTable(), + ptnDesc, + srcStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + addPartitionMessage.getDB(), + addPartitionMessage.getTable(), + ptnDesc) + ), + false, + event.getEventId() + ); + } + }); + + } + + public Iterable getDstWhCommands() { + verifyActionable(); + if (addPartitionMessage.getPartitions().isEmpty()){ + return Collections.singletonList(new NoopCommand(event.getEventId())); + } + + final String dstDbName = ReplicationUtils.mapIfMapAvailable(addPartitionMessage.getDB(), dbNameMapping); + final String dstTableName = ReplicationUtils.mapIfMapAvailable(addPartitionMessage.getTable(), tableNameMapping); + + return Iterables.transform(addPartitionMessage.getPartitions(), new Function, Command>() { + @Override + public Command apply(@Nullable Map ptnDesc) { + return new ImportCommand( + dstDbName, + dstTableName, + ptnDesc, + dstStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + addPartitionMessage.getDB(), // Note - important to retain the same key as the export + addPartitionMessage.getTable(), + ptnDesc) + ), + false, + event.getEventId() + ); + } + }); + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AlterPartitionReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AlterPartitionReplicationTask.java new file mode 100644 index 0000000..8a295d7 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AlterPartitionReplicationTask.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.ExportCommand; +import org.apache.hive.hcatalog.api.repl.commands.ImportCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.AlterPartitionMessage; + +import java.util.Collections; + +public class AlterPartitionReplicationTask extends ReplicationTask { + + AlterPartitionMessage alterPartitionMessage = null; + + public AlterPartitionReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event,HCatConstants.HCAT_ALTER_PARTITION_EVENT); + alterPartitionMessage = messageFactory.getDeserializer().getAlterPartitionMessage(event.getMessage()); + } + + public boolean needsStagingDirs(){ + return true; + } + + public Iterable getSrcWhCommands() { + verifyActionable(); + + return Collections.singletonList(new ExportCommand( + alterPartitionMessage.getDB(), + alterPartitionMessage.getTable(), + alterPartitionMessage.getKeyValues(), + srcStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + alterPartitionMessage.getDB(), + alterPartitionMessage.getTable(), + alterPartitionMessage.getKeyValues()) + ), + true, + event.getEventId() + )); + } + + public Iterable getDstWhCommands() { + verifyActionable(); + + final String dstDbName = ReplicationUtils.mapIfMapAvailable(alterPartitionMessage.getDB(), dbNameMapping); + final String dstTableName = ReplicationUtils.mapIfMapAvailable(alterPartitionMessage.getTable(), tableNameMapping); + + return Collections.singletonList(new ImportCommand( + dstDbName, + dstTableName, + alterPartitionMessage.getKeyValues(), + dstStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + alterPartitionMessage.getDB(), // Note - important to retain the same key as the export + alterPartitionMessage.getTable(), + alterPartitionMessage.getKeyValues()) + ), + true, + event.getEventId() + )); + } +} + diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AlterTableReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AlterTableReplicationTask.java new file mode 100644 index 0000000..631e4af --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AlterTableReplicationTask.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.ExportCommand; +import org.apache.hive.hcatalog.api.repl.commands.ImportCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.AlterTableMessage; + +import java.util.Collections; + +public class AlterTableReplicationTask extends ReplicationTask { + private final AlterTableMessage alterTableMessage; + + public AlterTableReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event, HCatConstants.HCAT_ALTER_TABLE_EVENT); + alterTableMessage = messageFactory.getDeserializer().getAlterTableMessage(event.getMessage()); + } + + public boolean needsStagingDirs(){ + return true; + } + + public Iterable getSrcWhCommands() { + verifyActionable(); + final String dbName = alterTableMessage.getDB(); + final String tableName = alterTableMessage.getTable(); + return Collections.singletonList(new ExportCommand( + dbName, + tableName, + null, + srcStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + dbName, + tableName, + null) + ), + true, + event.getEventId() + )); + } + + public Iterable getDstWhCommands() { + verifyActionable(); + final String dbName = alterTableMessage.getDB(); + final String tableName = alterTableMessage.getTable(); + return Collections.singletonList(new ImportCommand( + ReplicationUtils.mapIfMapAvailable(dbName, dbNameMapping), + ReplicationUtils.mapIfMapAvailable(tableName, tableNameMapping), + null, + dstStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + dbName, // Note - important to retain the same key as the export + tableName, + null) + ), + true, + event.getEventId() + )); + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/CreateDatabaseReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/CreateDatabaseReplicationTask.java new file mode 100644 index 0000000..8b96a4a --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/CreateDatabaseReplicationTask.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.NoopReplicationTask; +import org.apache.hive.hcatalog.common.HCatConstants; + +public class CreateDatabaseReplicationTask extends NoopReplicationTask { + + // "CREATE DATABASE" is specifically not replicated across, per design, since if a user + // drops a database and recreates another with the same one, we want to distinguish + // between the two. We will replicate the drop across, but after that, the goal is + // that if a new db is created, a new replication definition should be created in + // the replication implementer above this. Thus, we extend NoopReplicationTask and + // the only additional thing we do is validate event type. + + public CreateDatabaseReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event, HCatConstants.HCAT_CREATE_DATABASE_EVENT); + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/CreateTableReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/CreateTableReplicationTask.java new file mode 100644 index 0000000..2441014 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/CreateTableReplicationTask.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.ExportCommand; +import org.apache.hive.hcatalog.api.repl.commands.ImportCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.CreateTableMessage; + +import java.util.Collections; + +public class CreateTableReplicationTask extends ReplicationTask { + + private CreateTableMessage createTableMessage = null; + + public CreateTableReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event, HCatConstants.HCAT_CREATE_TABLE_EVENT); + createTableMessage = messageFactory.getDeserializer().getCreateTableMessage(event.getMessage()); + } + + public boolean needsStagingDirs(){ + return true; + } + + public Iterable getSrcWhCommands() { + verifyActionable(); + final String dbName = createTableMessage.getDB(); + final String tableName = createTableMessage.getTable(); + return Collections.singletonList(new ExportCommand( + dbName, + tableName, + null, + srcStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + dbName, + tableName, + null) + ), + false, + event.getEventId() + )); + } + + public Iterable getDstWhCommands() { + verifyActionable(); + final String dbName = createTableMessage.getDB(); + final String tableName = createTableMessage.getTable(); + return Collections.singletonList(new ImportCommand( + ReplicationUtils.mapIfMapAvailable(dbName, dbNameMapping), + ReplicationUtils.mapIfMapAvailable(tableName, tableNameMapping), + null, + dstStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + dbName, // Note - important to retain the same key as the export + tableName, + null) + ), + false, + event.getEventId() + )); + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropDatabaseReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropDatabaseReplicationTask.java new file mode 100644 index 0000000..93e718a --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropDatabaseReplicationTask.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.DropDatabaseCommand; +import org.apache.hive.hcatalog.api.repl.commands.NoopCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.DropDatabaseMessage; + +import java.util.Collections; + +public class DropDatabaseReplicationTask extends ReplicationTask { + private DropDatabaseMessage dropDatabaseMessage = null; + + public DropDatabaseReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event, HCatConstants.HCAT_DROP_DATABASE_EVENT); + dropDatabaseMessage = messageFactory.getDeserializer().getDropDatabaseMessage(event.getMessage()); + } + + public boolean needsStagingDirs(){ + return false; + } + + public Iterable getSrcWhCommands() { + verifyActionable(); + return Collections.singletonList(new NoopCommand(event.getEventId())); + } + + public Iterable getDstWhCommands() { + verifyActionable(); + final String dstDbName = ReplicationUtils.mapIfMapAvailable(dropDatabaseMessage.getDB(), dbNameMapping); + return Collections.singletonList(new DropDatabaseCommand(dstDbName, event.getEventId())); + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropPartitionReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropPartitionReplicationTask.java new file mode 100644 index 0000000..b93610c --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropPartitionReplicationTask.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.exim; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.DropPartitionCommand; +import org.apache.hive.hcatalog.api.repl.commands.NoopCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.DropPartitionMessage; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Map; + +public class DropPartitionReplicationTask extends ReplicationTask { + + DropPartitionMessage dropPartitionMessage = null; + + public DropPartitionReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event, HCatConstants.HCAT_DROP_PARTITION_EVENT); + dropPartitionMessage = messageFactory.getDeserializer().getDropPartitionMessage(event.getMessage()); + } + + public boolean needsStagingDirs(){ + return false; + } + + + public Iterable getSrcWhCommands() { + verifyActionable(); + return Collections.singletonList(new NoopCommand(event.getEventId())); + } + + public Iterable getDstWhCommands() { + verifyActionable(); + + final String dstDbName = ReplicationUtils.mapIfMapAvailable(dropPartitionMessage.getDB(), dbNameMapping); + final String dstTableName = ReplicationUtils.mapIfMapAvailable(dropPartitionMessage.getTable(), tableNameMapping); + + return Iterables.transform(dropPartitionMessage.getPartitions(), new Function, Command>() { + @Override + public Command apply(@Nullable Map ptnDesc) { + return new DropPartitionCommand( + dstDbName, + dstTableName, + ptnDesc, + true, + event.getEventId() + ); + } + }); + } +} + diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropTableReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropTableReplicationTask.java new file mode 100644 index 0000000..3768f52 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropTableReplicationTask.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.DropTableCommand; +import org.apache.hive.hcatalog.api.repl.commands.NoopCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.DropTableMessage; + +import java.util.Collections; + +public class DropTableReplicationTask extends ReplicationTask { + private DropTableMessage dropTableMessage = null; + + public DropTableReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event, HCatConstants.HCAT_DROP_TABLE_EVENT); + dropTableMessage = messageFactory.getDeserializer().getDropTableMessage(event.getMessage()); + } + + public boolean needsStagingDirs(){ + return false; + } + + public Iterable getSrcWhCommands() { + verifyActionable(); + return Collections.singletonList(new NoopCommand(event.getEventId())); + } + + public Iterable getDstWhCommands() { + verifyActionable(); + final String dstDbName = ReplicationUtils.mapIfMapAvailable(dropTableMessage.getDB(), dbNameMapping); + final String dstTableName = ReplicationUtils.mapIfMapAvailable(dropTableMessage.getTable(), tableNameMapping); + return Collections.singletonList(new DropTableCommand(dstDbName, dstTableName, true, event.getEventId())); + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/EximReplicationTaskFactory.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/EximReplicationTaskFactory.java new file mode 100644 index 0000000..64ddae2 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/EximReplicationTaskFactory.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.NoopReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.common.HCatConstants; + +/** + * EXIMReplicationTaskFactory is an export-import based ReplicationTask.Factory. + * + * It's primary mode of enabling replication is by translating each event it gets + * from the notification subsystem into hive commands that essentially export data + * to be copied over and imported on the other end. + * + * The Commands that Tasks return here are expected to be hive commands. + */ +public class EximReplicationTaskFactory implements ReplicationTask.Factory { + public ReplicationTask create(HCatClient client, HCatNotificationEvent event){ + // TODO : Java 1.7+ support using String with switches, but IDEs don't all seem to know that. + // If casing is fine for now. But we should eventually remove this. Also, I didn't want to + // create another enum just for this. + if (event.getEventType().equals(HCatConstants.HCAT_CREATE_DATABASE_EVENT)) { + return new CreateDatabaseReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) { + return new DropDatabaseReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_CREATE_TABLE_EVENT)) { + return new CreateTableReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) { + return new DropTableReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) { + return new AddPartitionReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { + return new DropPartitionReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_ALTER_TABLE_EVENT)) { + return new AlterTableReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_ALTER_PARTITION_EVENT)) { + return new AlterPartitionReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_INSERT_EVENT)) { + return new InsertReplicationTask(event); + } else { + throw new IllegalStateException("Unrecognized Event type, no replication task available"); + } + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/InsertReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/InsertReplicationTask.java new file mode 100644 index 0000000..48e8aa9 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/InsertReplicationTask.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.ExportCommand; +import org.apache.hive.hcatalog.api.repl.commands.ImportCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.InsertMessage; + +import java.util.Collections; +import java.util.Map; + +public class InsertReplicationTask extends ReplicationTask { + private final InsertMessage insertMessage; + + public InsertReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event, HCatConstants.HCAT_INSERT_EVENT); + insertMessage = messageFactory.getDeserializer().getInsertMessage(event.getMessage()); + } + + + public boolean needsStagingDirs(){ + // we need staging directories as long as a single partition needed addition + return true; + } + + @Override + public Iterable getSrcWhCommands() { + verifyActionable(); + + final String dbName = insertMessage.getDB(); + final String tableName = insertMessage.getTable(); + final Map ptnDesc = insertMessage.getPartitionKeyValues(); + // Note : ptnDesc can be null or empty for non-ptn table + + return Collections.singletonList(new ExportCommand( + dbName, + tableName, + ptnDesc, + srcStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + dbName, + tableName, + ptnDesc) + ), + false, + event.getEventId() + )); + + } + + public Iterable getDstWhCommands() { + verifyActionable(); + + final String dbName = insertMessage.getDB(); + final String tableName = insertMessage.getTable(); + final Map ptnDesc = insertMessage.getPartitionKeyValues(); + // Note : ptnDesc can be null or empty for non-ptn table + + return Collections.singletonList(new ImportCommand( + ReplicationUtils.mapIfMapAvailable(dbName, dbNameMapping), + ReplicationUtils.mapIfMapAvailable(tableName, tableNameMapping), + ptnDesc, + dstStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + dbName, // Note - important to retain the same key as the export + tableName, + ptnDesc) + ), + false, + event.getEventId() + )); + + } + +} diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java index eb21a0f..b2d4644 100644 --- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java @@ -58,6 +58,7 @@ import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema.Type; import org.apache.hive.hcatalog.NoExitSecurityManager; +import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils; import org.apache.hive.hcatalog.listener.DbNotificationListener; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -87,7 +88,7 @@ private static boolean useExternalMS = false; private static boolean useExternalMSForReplication = false; - private static class RunMS implements Runnable { + public static class RunMS implements Runnable { private final String msPort; private List args = new ArrayList(); @@ -156,6 +157,11 @@ public static void startMetaStoreServer() throws Exception { System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); } + + public static HiveConf getConf(){ + return hcatConf; + } + public static String fixPath(String path) { if(!Shell.WINDOWS) { return path; @@ -166,6 +172,7 @@ public static String fixPath(String path) { } return expectedDir; } + @Test public void testBasicDDLCommands() throws Exception { String db = "testdb"; @@ -830,7 +837,64 @@ private void startReplicationTargetMetaStoreIfRequired() throws Exception { @Test public void testReplicationTaskIter() throws Exception { - HCatClient sourceMetastore = HCatClient.create(new Configuration(hcatConf)); + Configuration cfg = new Configuration(hcatConf); + cfg.set(HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX.varname,"10"); // set really low batch size to ensure batching + HCatClient sourceMetastore = HCatClient.create(cfg); + + String dbName = "testReplicationTaskIter"; + long baseId = sourceMetastore.getCurrentNotificationEventId(); + + { + // Perform some operations + + // 1: Create a db after dropping if needed => 1 or 2 events + sourceMetastore.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + sourceMetastore.createDatabase(HCatCreateDBDesc.create(dbName).ifNotExists(false).build()); + + // 2: Create an unpartitioned table T1 => 1 event + String tblName1 = "T1"; + List cols1 = HCatSchemaUtils.getHCatSchema("a:int,b:string").getFields(); + HCatTable table1 = (new HCatTable(dbName, tblName1)).cols(cols1); + sourceMetastore.createTable(HCatCreateTableDesc.create(table1).build()); + + // 3: Create a partitioned table T2 => 1 event + + String tblName2 = "T2"; + List cols2 = HCatSchemaUtils.getHCatSchema("a:int").getFields(); + List pcols2 = HCatSchemaUtils.getHCatSchema("b:string").getFields(); + HCatTable table2 = (new HCatTable(dbName, tblName2)).cols(cols2).partCols(pcols2); + sourceMetastore.createTable(HCatCreateTableDesc.create(table2).build()); + + // 4: Add a partition P1 to T2 => 1 event + + Map ptnDesc1 = new HashMap(); + ptnDesc1.put("b","test1"); + HCatPartition ptn1 = (new HCatPartition(table2, ptnDesc1, "")); + sourceMetastore.addPartition(HCatAddPartitionDesc.create(ptn1).build()); + + // 5 : Create and drop partition P2 to T2 10 times => 20 events + + for (int i = 0; i < 20; i++){ + Map ptnDesc = new HashMap(); + ptnDesc.put("b","testmul"+i); + HCatPartition ptn = (new HCatPartition(table2, ptnDesc, "")); + sourceMetastore.addPartition(HCatAddPartitionDesc.create(ptn).build()); + sourceMetastore.dropPartitions(dbName,tblName2,ptnDesc,true); + } + + // 6 : Drop table T1 => 1 event + sourceMetastore.dropTable(dbName, tblName1, true); + + // 7 : Drop table T2 => 1 event + sourceMetastore.dropTable(dbName, tblName2, true); + + // verify that the number of events since we began is at least 25 more + long currId = sourceMetastore.getCurrentNotificationEventId(); + assertTrue("currId[" + currId + "] must be more than 25 greater than baseId[" + baseId + "]", currId > baseId + 25); + + } + + // do rest of tests on db we just picked up above. List notifs = sourceMetastore.getNextNotification( 0, 0, new IMetaStoreClient.NotificationFilter() { @@ -844,7 +908,7 @@ public boolean accept(NotificationEvent event) { + ":" + n.getEventTime() + ",t:" + n.getEventType() + ",o:" + n.getDbName() + "." + n.getTableName()); } - Iterator taskIter = sourceMetastore.getReplicationTasks(0, 0, "mydb", null); + Iterator taskIter = sourceMetastore.getReplicationTasks(0, -1, dbName, null); while(taskIter.hasNext()){ ReplicationTask task = taskIter.next(); HCatNotificationEvent n = task.getEvent(); diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/CommandTestUtils.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/CommandTestUtils.java new file mode 100644 index 0000000..468f84f --- /dev/null +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/CommandTestUtils.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; + +import static junit.framework.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * Class which provides several useful methods to test commands, but is itself not a test. + */ +public class CommandTestUtils { + + private static Log LOG = LogFactory.getLog(CommandTestUtils.class.getName()); + + public static void compareCommands(Command expected, Command actual, boolean ignoreSortOrder) { + // The reason we use compare-command, rather than simply getting the serialized output and comparing + // for partition-based commands is that the partition specification order can be different in different + // serializations, but still be effectively the same. (a="42",b="abc") should be the same as (b="abc",a="42") + assertEquals(expected.getClass(),actual.getClass()); + assertEquals(expected.getEventId(),actual.getEventId()); + assertEquals(expected.isUndoable(),actual.isUndoable()); + assertEquals(expected.isRetriable(),actual.isRetriable()); + + assertEquals(expected.get().size(),actual.get().size()); + Iterator actualIter = actual.get().iterator(); + for (String s : expected.get()){ + if (ignoreSortOrder){ + // compare sorted strings, rather than comparing exact strings. + assertSortedEquals(s, actualIter.next()); + } else { + assertEquals(s,actualIter.next()); + } + } + + if (expected.isUndoable()){ + Iterator actualUndoIter = actual.getUndo().iterator(); + for (String s: expected.getUndo()){ + if (ignoreSortOrder){ + assertSortedEquals(s,actualUndoIter.next()); + } else { + assertEquals(s,actualIter.next()); + } + } + } + } + + private static void assertSortedEquals(String expected, String actual) { + char[] expectedChars = expected.toCharArray(); + Arrays.sort(expectedChars); + char[] actualChars = actual.toCharArray(); + Arrays.sort(actualChars); + assertEquals(String.valueOf(expectedChars), String.valueOf(actualChars)); + } + + public static void testCommandSerialization(Command cmd) { + String serializedCmd = null; + try { + serializedCmd = ReplicationUtils.serializeCommand(cmd); + } catch (IOException e) { + LOG.error("Serialization error",e); + assertNull(e); // error out. + } + + Command cmd2 = null; + try { + cmd2 = ReplicationUtils.deserializeCommand(serializedCmd); + } catch (IOException e) { + LOG.error("Serialization error",e); + assertNull(e); // error out. + } + + assertEquals(cmd.getClass(),cmd2.getClass()); + assertEquals(cmd.getEventId(), cmd2.getEventId()); + assertEquals(cmd.get(), cmd2.get()); + assertEquals(cmd.isUndoable(),cmd2.isUndoable()); + if (cmd.isUndoable()){ + assertEquals(cmd.getUndo(),cmd2.getUndo()); + } + assertEquals(cmd.isRetriable(),cmd2.isRetriable()); + } + +} diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/TestReplicationTask.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/TestReplicationTask.java new file mode 100644 index 0000000..2915c68 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/TestReplicationTask.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl; + +import junit.framework.TestCase; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.messaging.MessageFactory; +import org.junit.Test; + +public class TestReplicationTask extends TestCase{ + private static MessageFactory msgFactory = MessageFactory.getInstance(); + + + @Test + public static void testCreate() throws HCatException { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + NotificationEvent event = new NotificationEvent(0, (int)System.currentTimeMillis(), + HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + ReplicationTask.resetFactory(null); + ReplicationTask rtask = ReplicationTask.create(HCatClient.create(new HiveConf()),new HCatNotificationEvent(event)); + + assertTrue("Default factory instantiation should yield NoopReplicationTask", rtask instanceof NoopReplicationTask); + } + +} diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestCommands.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestCommands.java new file mode 100644 index 0000000..05fd89f --- /dev/null +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestCommands.java @@ -0,0 +1,585 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.commands; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.hcatalog.HcatTestUtils; +import org.apache.hive.hcatalog.api.HCatAddPartitionDesc; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.api.HCatCreateDBDesc; +import org.apache.hive.hcatalog.api.HCatCreateTableDesc; +import org.apache.hive.hcatalog.api.HCatDatabase; +import org.apache.hive.hcatalog.api.HCatPartition; +import org.apache.hive.hcatalog.api.HCatTable; +import org.apache.hive.hcatalog.api.ObjectNotFoundException; +import org.apache.hive.hcatalog.api.TestHCatClient; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.CommandTestUtils; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestCommands { + + private static Log LOG = LogFactory.getLog(CommandTestUtils.class.getName()); + + private static HiveConf hconf; + private static Driver driver; + private static HCatClient client; + private static String TEST_PATH; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + + TestHCatClient.startMetaStoreServer(); + hconf = TestHCatClient.getConf(); + hconf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,""); + + TEST_PATH = System.getProperty("test.warehouse.dir","/tmp") + Path.SEPARATOR + + TestCommands.class.getCanonicalName() + "-" + System.currentTimeMillis(); + Path testPath = new Path(TEST_PATH); + FileSystem fs = FileSystem.get(testPath.toUri(),hconf); + fs.mkdirs(testPath); + + driver = new Driver(hconf); + SessionState.start(new CliSessionState(hconf)); + client = HCatClient.create(hconf); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TestHCatClient.tearDown(); + } + + @Test + public void testDropDatabaseCommand() throws HCatException, CommandNeedRetryException { + String dbName = "cmd_testdb"; + int evid = 999; + Command testCmd = new DropDatabaseCommand(dbName, evid); + + assertEquals(evid,testCmd.getEventId()); + assertEquals(1, testCmd.get().size()); + assertEquals(true,testCmd.isRetriable()); + assertEquals(false,testCmd.isUndoable()); + + CommandTestUtils.testCommandSerialization(testCmd); + + client.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + client.createDatabase(HCatCreateDBDesc.create(dbName).ifNotExists(false).build()); + HCatDatabase db = client.getDatabase(dbName); + assertNotNull(db); + + LOG.info("About to run :"+testCmd.get().get(0)); + driver.run(testCmd.get().get(0)); + + Exception onfe = null; + try { + HCatDatabase db_del = client.getDatabase(dbName); + } catch (Exception e) { + onfe = e; + } + + assertNotNull(onfe); + assertTrue(onfe instanceof ObjectNotFoundException); + } + + @Test + public void testDropTableCommand() throws HCatException, CommandNeedRetryException { + String dbName = "cmd_testdb"; + String tableName = "cmd_testtable"; + int evid = 789; + List cols = HCatSchemaUtils.getHCatSchema("a:int,b:string").getFields(); + + Command testReplicatedDropCmd = new DropTableCommand(dbName,tableName,true,evid); + + assertEquals(evid,testReplicatedDropCmd.getEventId()); + assertEquals(1, testReplicatedDropCmd.get().size()); + assertEquals(true, testReplicatedDropCmd.isRetriable()); + assertEquals(false, testReplicatedDropCmd.isUndoable()); + + CommandTestUtils.testCommandSerialization(testReplicatedDropCmd); + + Command testNormalDropCmd = new DropTableCommand(dbName,tableName,false,evid); + + assertEquals(evid,testNormalDropCmd.getEventId()); + assertEquals(1, testNormalDropCmd.get().size()); + assertEquals(true,testNormalDropCmd.isRetriable()); + assertEquals(false,testNormalDropCmd.isUndoable()); + + CommandTestUtils.testCommandSerialization(testNormalDropCmd); + + client.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + client.createDatabase(HCatCreateDBDesc.create(dbName).ifNotExists(false).build()); + + Map tprops = new HashMap(); + tprops.put(ReplicationUtils.REPL_STATE_ID,String.valueOf(evid + 5)); + HCatTable tableToCreate = (new HCatTable(dbName, tableName)).tblProps(tprops).cols(cols); + + client.createTable(HCatCreateTableDesc.create(tableToCreate).build()); + HCatTable t1 = client.getTable(dbName, tableName); + assertNotNull(t1); + + // Test replicated drop, should not drop, because evid < repl.state.id + LOG.info("About to run :"+testReplicatedDropCmd.get().get(0)); + driver.run(testReplicatedDropCmd.get().get(0)); + HCatTable t2 = client.getTable(dbName,tableName); + assertNotNull(t2); + + // Test normal drop, should drop unconditionally. + LOG.info("About to run :"+testNormalDropCmd.get().get(0)); + driver.run(testNormalDropCmd.get().get(0)); + + Exception onfe = null; + try { + HCatTable t_del = client.getTable(dbName, tableName); + } catch (Exception e) { + onfe = e; + } + + assertNotNull(onfe); + assertTrue(onfe instanceof ObjectNotFoundException); + + Map tprops2 = new HashMap(); + tprops2.put(ReplicationUtils.REPL_STATE_ID,String.valueOf(evid - 5)); + HCatTable tableToCreate2 = (new HCatTable(dbName, tableName)).tblProps(tprops2).cols(cols); + + client.createTable(HCatCreateTableDesc.create(tableToCreate2).build()); + HCatTable t3 = client.getTable(dbName, tableName); + assertNotNull(t3); + + // Test replicated drop, should drop this time, since repl.state.id < evid. + LOG.info("About to run :"+testReplicatedDropCmd.get().get(0)); + driver.run(testReplicatedDropCmd.get().get(0)); + + Exception onfe2 = null; + try { + HCatTable t_del = client.getTable(dbName, tableName); + } catch (Exception e) { + onfe2 = e; + } + + assertNotNull(onfe2); + assertTrue(onfe2 instanceof ObjectNotFoundException); + + } + + @Test + public void testDropPartitionCommand() throws HCatException, CommandNeedRetryException { + String dbName = "cmd_testdb"; + String tableName = "cmd_testtable"; + int evid = 789; + + List pcols = HCatSchemaUtils.getHCatSchema("b:string").getFields(); + List cols = HCatSchemaUtils.getHCatSchema("a:int").getFields(); + Map ptnDesc = new HashMap(); + ptnDesc.put("b","test"); + + Command testReplicatedDropPtnCmd = new DropPartitionCommand(dbName, tableName, ptnDesc, true, evid); + + assertEquals(evid,testReplicatedDropPtnCmd.getEventId()); + assertEquals(1, testReplicatedDropPtnCmd.get().size()); + assertEquals(true, testReplicatedDropPtnCmd.isRetriable()); + assertEquals(false, testReplicatedDropPtnCmd.isUndoable()); + + CommandTestUtils.testCommandSerialization(testReplicatedDropPtnCmd); + + Command testNormalDropPtnCmd = new DropPartitionCommand(dbName,tableName, ptnDesc, false, evid); + + assertEquals(evid,testNormalDropPtnCmd.getEventId()); + assertEquals(1, testNormalDropPtnCmd.get().size()); + assertEquals(true,testNormalDropPtnCmd.isRetriable()); + assertEquals(false,testNormalDropPtnCmd.isUndoable()); + + CommandTestUtils.testCommandSerialization(testNormalDropPtnCmd); + + client.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + client.createDatabase(HCatCreateDBDesc.create(dbName).ifNotExists(false).build()); + + Map props = new HashMap(); + props.put(ReplicationUtils.REPL_STATE_ID,String.valueOf(evid + 5)); + HCatTable table = (new HCatTable(dbName, tableName)).tblProps(props).cols(cols).partCols(pcols); + + client.createTable(HCatCreateTableDesc.create(table).build()); + HCatTable t = client.getTable(dbName, tableName); + assertNotNull(t); + + HCatPartition ptnToAdd = (new HCatPartition(table, ptnDesc, "")).parameters(props); + client.addPartition(HCatAddPartitionDesc.create(ptnToAdd).build()); + + HCatPartition p1 = client.getPartition(dbName,tableName,ptnDesc); + assertNotNull(p1); + + // Test replicated drop, should not drop, because evid < repl.state.id + LOG.info("About to run :"+testReplicatedDropPtnCmd.get().get(0)); + driver.run(testReplicatedDropPtnCmd.get().get(0)); + HCatPartition p2 = client.getPartition(dbName,tableName,ptnDesc); + assertNotNull(p2); + + // Test normal drop, should drop unconditionally. + LOG.info("About to run :"+testNormalDropPtnCmd.get().get(0)); + driver.run(testNormalDropPtnCmd.get().get(0)); + + Exception onfe = null; + try { + HCatPartition p_del = client.getPartition(dbName,tableName,ptnDesc); + } catch (Exception e) { + onfe = e; + } + + assertNotNull(onfe); + assertTrue(onfe instanceof ObjectNotFoundException); + + Map props2 = new HashMap(); + props2.put(ReplicationUtils.REPL_STATE_ID,String.valueOf(evid - 5)); + + HCatPartition ptnToAdd2 = (new HCatPartition(table, ptnDesc, "")).parameters(props2); + client.addPartition(HCatAddPartitionDesc.create(ptnToAdd2).build()); + + HCatPartition p3 = client.getPartition(dbName,tableName,ptnDesc); + assertNotNull(p3); + + // Test replicated drop, should drop this time, since repl.state.id < evid. + LOG.info("About to run :"+testReplicatedDropPtnCmd.get().get(0)); + driver.run(testReplicatedDropPtnCmd.get().get(0)); + + Exception onfe2 = null; + try { + HCatPartition p_del = client.getPartition(dbName,tableName,ptnDesc); + } catch (Exception e) { + onfe2 = e; + } + + assertNotNull(onfe2); + assertTrue(onfe2 instanceof ObjectNotFoundException); + } + + @Test + public void testDropTableCommand2() throws HCatException, CommandNeedRetryException { + // Secondary DropTableCommand test for testing repl-drop-tables' effect on partitions inside a partitioned table + // when there exist partitions inside the table which are older than the drop event. + // Our goal is this : Create a table t, with repl.last.id=157, say. + // Create 2 partitions inside it, with repl.last.id=150 and 160, say. + // Now, process a drop table command with eventid=155. + // It should result in the table and the partition with repl.last.id=160 continuing to exist, + // but dropping the partition with repl.last.id=150. + + String dbName = "cmd_testdb"; + String tableName = "cmd_testtable"; + int evid = 157; + + List pcols = HCatSchemaUtils.getHCatSchema("b:string").getFields(); + List cols = HCatSchemaUtils.getHCatSchema("a:int").getFields(); + + Command testReplicatedDropCmd = new DropTableCommand(dbName,tableName,true,evid); + + client.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + client.createDatabase(HCatCreateDBDesc.create(dbName).ifNotExists(false).build()); + + Map tprops = new HashMap(); + tprops.put(ReplicationUtils.REPL_STATE_ID,String.valueOf(evid + 2)); + HCatTable table = (new HCatTable(dbName, tableName)).tblProps(tprops).cols(cols).partCols(pcols); + + client.createTable(HCatCreateTableDesc.create(table).build()); + HCatTable t = client.getTable(dbName, tableName); + assertNotNull(t); + + Map ptnDesc1 = new HashMap(); + ptnDesc1.put("b","test-older"); + Map props1 = new HashMap(); + props1.put(ReplicationUtils.REPL_STATE_ID,String.valueOf(evid - 5)); + HCatPartition ptnToAdd1 = (new HCatPartition(table, ptnDesc1, "")).parameters(props1); + client.addPartition(HCatAddPartitionDesc.create(ptnToAdd1).build()); + + Map ptnDesc2 = new HashMap(); + ptnDesc2.put("b","test-newer"); + Map props2 = new HashMap(); + props2.put(ReplicationUtils.REPL_STATE_ID, String.valueOf(evid + 5)); + HCatPartition ptnToAdd2 = (new HCatPartition(table, ptnDesc2, "")).parameters(props2); + client.addPartition(HCatAddPartitionDesc.create(ptnToAdd2).build()); + + HCatPartition p1 = client.getPartition(dbName,tableName,ptnDesc1); + assertNotNull(p1); + HCatPartition p2 = client.getPartition(dbName,tableName,ptnDesc2); + assertNotNull(p2); + + LOG.info("About to run :"+testReplicatedDropCmd.get().get(0)); + driver.run(testReplicatedDropCmd.get().get(0)); + + HCatTable t_stillExists = client.getTable(dbName,tableName); + assertNotNull(t_stillExists); + + HCatPartition p2_stillExists = client.getPartition(dbName,tableName,ptnDesc2); + + Exception onfe = null; + try { + HCatPartition p1_del = client.getPartition(dbName,tableName,ptnDesc1); + } catch (Exception e) { + onfe = e; + } + + assertNotNull(onfe); + assertTrue(onfe instanceof ObjectNotFoundException); + } + + + @Test + public void testBasicReplEximCommands() throws IOException, CommandNeedRetryException { + // repl export, has repl.last.id and repl.scope=all in it + // import repl dump, table has repl.last.id on it (will likely be 0) + int evid = 111; + String exportLocation = TEST_PATH + File.separator + "testBasicReplExim"; + Path tempPath = new Path(TEST_PATH ,"testBasicReplEximTmp"); + String tempLocation = tempPath.toUri().getPath(); + + String dbName = "exim"; + String tableName = "basicSrc"; + String importedTableName = "basicDst"; + List cols = HCatSchemaUtils.getHCatSchema("b:string").getFields(); + + client.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + client.createDatabase(HCatCreateDBDesc.create(dbName).ifNotExists(false).build()); + + HCatTable table = (new HCatTable(dbName, tableName)).cols(cols).fileFormat("textfile"); + + client.createTable(HCatCreateTableDesc.create(table).build()); + HCatTable t = client.getTable(dbName, tableName); + assertNotNull(t); + + String[] data = new String[]{ "eleven" , "twelve" }; + + HcatTestUtils.createTestDataFile(tempLocation,data); + + CommandProcessorResponse ret = driver.run( + "LOAD DATA LOCAL INPATH '"+tempLocation+"' OVERWRITE INTO TABLE "+ dbName+ "." + tableName + ); + assertEquals(ret.getResponseCode() + ":" + ret.getErrorMessage(), null, ret.getException()); + + CommandProcessorResponse selectRet = driver.run("SELECT * from " + dbName + "." + tableName); + assertEquals(selectRet.getResponseCode() + ":" + selectRet.getErrorMessage(), + null, selectRet.getException()); + + List values = new ArrayList(); + driver.getResults(values); + + assertEquals(2, values.size()); + assertEquals(data[0],values.get(0)); + assertEquals(data[1],values.get(1)); + + ExportCommand exportCmd = new ExportCommand(dbName,tableName,null, + exportLocation, false, evid); + + LOG.info("About to run :" + exportCmd.get().get(0)); + CommandProcessorResponse ret2 = driver.run(exportCmd.get().get(0)); + assertEquals(ret2.getResponseCode() + ":" + ret2.getErrorMessage(), null, ret2.getException()); + + List exportPaths = exportCmd.cleanupLocationsAfterEvent(); + assertEquals(1,exportPaths.size()); + String metadata = getMetadataContents(exportPaths.get(0)); + LOG.info("Export returned the following _metadata contents:"); + LOG.info(metadata); + assertTrue(metadata + "did not match \"repl.scope\"=\"all\"", metadata.matches(".*\"repl.scope\":\"all\".*")); + assertTrue(metadata + "has \"repl.last.id\"",metadata.matches(".*\"repl.last.id\":.*")); + + ImportCommand importCmd = new ImportCommand(dbName, importedTableName, null, exportLocation, false, evid); + + LOG.info("About to run :" + importCmd.get().get(0)); + CommandProcessorResponse ret3 = driver.run(importCmd.get().get(0)); + assertEquals(ret3.getResponseCode() + ":" + ret3.getErrorMessage(), null, ret3.getException()); + + CommandProcessorResponse selectRet2 = driver.run("SELECT * from " + dbName + "." + importedTableName); + assertEquals(selectRet2.getResponseCode() + ":" + selectRet2.getErrorMessage(), + null, selectRet2.getException()); + + List values2 = new ArrayList(); + driver.getResults(values2); + + assertEquals(2, values2.size()); + assertEquals(data[0],values2.get(0)); + assertEquals(data[1],values2.get(1)); + + HCatTable importedTable = client.getTable(dbName,importedTableName); + assertNotNull(importedTable); + + assertTrue(importedTable.getTblProps().containsKey("repl.last.id")); + } + + @Test + public void testMetadataReplEximCommands() throws IOException, CommandNeedRetryException { + // repl metadata export, has repl.last.id and repl.scope=metadata + // import repl metadata dump, table metadata changed, allows override, has repl.last.id + int evid = 222; + String exportLocation = TEST_PATH + File.separator + "testMetadataReplExim"; + Path tempPath = new Path(TEST_PATH ,"testMetadataReplEximTmp"); + String tempLocation = tempPath.toUri().getPath(); + + String dbName = "exim"; + String tableName = "basicSrc"; + String importedTableName = "basicDst"; + List cols = HCatSchemaUtils.getHCatSchema("b:string").getFields(); + + client.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + client.createDatabase(HCatCreateDBDesc.create(dbName).ifNotExists(false).build()); + + HCatTable table = (new HCatTable(dbName, tableName)).cols(cols).fileFormat("textfile"); + + client.createTable(HCatCreateTableDesc.create(table).build()); + HCatTable t = client.getTable(dbName, tableName); + assertNotNull(t); + + String[] data = new String[]{ "eleven" , "twelve" }; + + HcatTestUtils.createTestDataFile(tempLocation,data); + + CommandProcessorResponse ret = driver.run( + "LOAD DATA LOCAL INPATH '"+tempLocation+"' OVERWRITE INTO TABLE "+ dbName+ "." + tableName + ); + assertEquals(ret.getResponseCode() + ":" + ret.getErrorMessage(), null, ret.getException()); + + CommandProcessorResponse selectRet = driver.run("SELECT * from " + dbName + "." + tableName); + assertEquals(selectRet.getResponseCode() + ":" + selectRet.getErrorMessage(), + null, selectRet.getException()); + + List values = new ArrayList(); + driver.getResults(values); + + assertEquals(2, values.size()); + assertEquals(data[0],values.get(0)); + assertEquals(data[1],values.get(1)); + + ExportCommand exportMdCmd = new ExportCommand(dbName,tableName,null, + exportLocation, true, evid); + + LOG.info("About to run :" + exportMdCmd.get().get(0)); + CommandProcessorResponse ret2 = driver.run(exportMdCmd.get().get(0)); + assertEquals(ret2.getResponseCode() + ":" + ret2.getErrorMessage(), null, ret2.getException()); + + List exportPaths = exportMdCmd.cleanupLocationsAfterEvent(); + assertEquals(1,exportPaths.size()); + String metadata = getMetadataContents(exportPaths.get(0)); + LOG.info("Export returned the following _metadata contents:"); + LOG.info(metadata); + assertTrue(metadata + "did not match \"repl.scope\"=\"metadata\"",metadata.matches(".*\"repl.scope\":\"metadata\".*")); + assertTrue(metadata + "has \"repl.last.id\"",metadata.matches(".*\"repl.last.id\":.*")); + + ImportCommand importMdCmd = new ImportCommand(dbName, importedTableName, null, exportLocation, true, evid); + + LOG.info("About to run :" + importMdCmd.get().get(0)); + CommandProcessorResponse ret3 = driver.run(importMdCmd.get().get(0)); + assertEquals(ret3.getResponseCode() + ":" + ret3.getErrorMessage(), null, ret3.getException()); + + CommandProcessorResponse selectRet2 = driver.run("SELECT * from " + dbName + "." + importedTableName); + assertEquals(selectRet2.getResponseCode() + ":" + selectRet2.getErrorMessage(), + null, selectRet2.getException()); + + List values2 = new ArrayList(); + driver.getResults(values2); + + assertEquals(0, values2.size()); + + HCatTable importedTable = client.getTable(dbName,importedTableName); + assertNotNull(importedTable); + + assertTrue(importedTable.getTblProps().containsKey("repl.last.id")); + } + + + @Test + public void testNoopReplEximCommands() throws CommandNeedRetryException, IOException { + // repl noop export on non-existant table, has repl.noop, does not error + // import repl noop dump, no error + + int evid = 333; + String exportLocation = TEST_PATH + File.separator + "testNoopReplExim"; + String dbName = "doesNotExist" + System.currentTimeMillis(); + String tableName = "nope" + System.currentTimeMillis(); + + ExportCommand noopExportCmd = new ExportCommand(dbName,tableName,null, + exportLocation, false, evid); + + LOG.info("About to run :" + noopExportCmd.get().get(0)); + CommandProcessorResponse ret = driver.run(noopExportCmd.get().get(0)); + assertEquals(ret.getResponseCode() + ":" + ret.getErrorMessage(), null, ret.getException()); + + List exportPaths = noopExportCmd.cleanupLocationsAfterEvent(); + assertEquals(1,exportPaths.size()); + String metadata = getMetadataContents(exportPaths.get(0)); + LOG.info("Export returned the following _metadata contents:"); + LOG.info(metadata); + assertTrue(metadata + "did not match \"repl.noop\"=\"true\"",metadata.matches(".*\"repl.noop\":\"true\".*")); + + ImportCommand noopImportCmd = new ImportCommand(dbName, tableName, null, exportLocation, false, evid); + + LOG.info("About to run :" + noopImportCmd.get().get(0)); + CommandProcessorResponse ret2 = driver.run(noopImportCmd.get().get(0)); + assertEquals(ret2.getResponseCode() + ":" + ret2.getErrorMessage(), null, ret2.getException()); + + Exception onfe = null; + try { + HCatDatabase d_doesNotExist = client.getDatabase(dbName); + } catch (Exception e) { + onfe = e; + } + + assertNotNull(onfe); + assertTrue(onfe instanceof ObjectNotFoundException); + } + + private static String getMetadataContents(String exportPath) throws IOException { + Path mdFilePath = new Path(exportPath,"_metadata"); + + FileSystem fs = FileSystem.get(mdFilePath.toUri(), hconf); + assertTrue(mdFilePath.toUri().toString() + "does not exist",fs.exists(mdFilePath)); + + BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(mdFilePath))); + StringBuilder sb = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + sb.append(line); + } + reader.close(); + return sb.toString(); + } + +} diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestNoopCommand.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestNoopCommand.java new file mode 100644 index 0000000..e8fefbc --- /dev/null +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestNoopCommand.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.commands; + +import junit.framework.TestCase; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.CommandTestUtils; +import org.junit.Test; + +public class TestNoopCommand extends TestCase { + + @Test + public static void testCommand(){ + int evid = 999; + Command testCmd = new NoopCommand(evid); + + assertEquals(evid,testCmd.getEventId()); + assertEquals(0, testCmd.get().size()); + assertEquals(true,testCmd.isRetriable()); + assertEquals(true,testCmd.isUndoable()); + assertEquals(0, testCmd.getUndo().size()); + + CommandTestUtils.testCommandSerialization(testCmd); + } + +} diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/exim/TestEximReplicationTasks.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/exim/TestEximReplicationTasks.java new file mode 100644 index 0000000..861ebc8 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/exim/TestEximReplicationTasks.java @@ -0,0 +1,598 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.exim; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.TestHCatClient; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.CommandTestUtils; +import org.apache.hive.hcatalog.api.repl.NoopReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider; +import org.apache.hive.hcatalog.api.repl.commands.DropDatabaseCommand; +import org.apache.hive.hcatalog.api.repl.commands.DropPartitionCommand; +import org.apache.hive.hcatalog.api.repl.commands.DropTableCommand; +import org.apache.hive.hcatalog.api.repl.commands.ExportCommand; +import org.apache.hive.hcatalog.api.repl.commands.ImportCommand; +import org.apache.hive.hcatalog.api.repl.commands.NoopCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hive.hcatalog.messaging.MessageFactory; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestEximReplicationTasks{ + + private static MessageFactory msgFactory = MessageFactory.getInstance(); + private static StagingDirectoryProvider stagingDirectoryProvider = + new StagingDirectoryProvider.TrivialImpl("/tmp","/"); + private static HCatClient client; + + @BeforeClass + public static void setUpBeforeClass() throws HCatException { + + client = HCatClient.create(new HiveConf()); + + ReplicationTask.resetFactory(EximReplicationTaskFactory.class); + } + + // Dummy mapping used for all db and table name mappings + static Function debugMapping = new Function(){ + @Nullable + @Override + public String apply(@Nullable String s) { + if (s == null){ + return null; + } else { + StringBuilder sb = new StringBuilder(s); + return sb.toString() + sb.reverse().toString(); + } + } + }; + + @Test + public void testDebugMapper(){ + assertEquals("BlahhalB",debugMapping.apply("Blah")); + assertEquals(null, debugMapping.apply(null)); + assertEquals("", debugMapping.apply("")); + } + + @Test + public void testCreateDb(){ + Database db = new Database(); + db.setName("testdb"); + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_CREATE_DATABASE_EVENT, msgFactory.buildCreateDatabaseMessage(db).toString()); + event.setDbName(db.getName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(client,hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyCreateDbReplicationTask(rtask); // CREATE DB currently replicated as Noop. + } + + private static void verifyCreateDbReplicationTask(ReplicationTask rtask) { + assertEquals(CreateDatabaseReplicationTask.class, rtask.getClass()); + assertTrue("CreateDatabaseReplicationTask should be a noop", rtask instanceof NoopReplicationTask); + assertEquals(false, rtask.needsStagingDirs()); + assertEquals(true, rtask.isActionable()); + for (Command c : rtask.getSrcWhCommands()){ + assertEquals(NoopCommand.class,c.getClass()); + } + for (Command c : rtask.getDstWhCommands()){ + assertEquals(NoopCommand.class,c.getClass()); + } + } + + @Test + public void testDropDb() throws IOException { + Database db = new Database(); + db.setName("testdb"); + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_DROP_DATABASE_EVENT, msgFactory.buildCreateDatabaseMessage(db).toString()); + event.setDbName(db.getName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(client,hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyDropDbReplicationTask(rtask); + + } + + private static void verifyDropDbReplicationTask(ReplicationTask rtask) throws IOException { + assertEquals(DropDatabaseReplicationTask.class, rtask.getClass()); + assertEquals(false, rtask.needsStagingDirs()); + assertEquals(true, rtask.isActionable()); + + rtask + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + for (Command c : rtask.getSrcWhCommands()){ + assertEquals(NoopCommand.class, c.getClass()); + } + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(1,dstCommands.size()); + assertEquals(DropDatabaseCommand.class, dstCommands.get(0).getClass()); + + DropDatabaseCommand dropDatabaseCommand = new DropDatabaseCommand( + debugMapping.apply(rtask.getEvent().getDbName()), + rtask.getEvent().getEventId()); + + assertEquals(ReplicationUtils.serializeCommand(dropDatabaseCommand), + ReplicationUtils.serializeCommand(dstCommands.get(0))); + } + + @Test + public void testCreateTable() throws IOException { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(client,hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyCreateTableReplicationTask(rtask); + } + + private static void verifyCreateTableReplicationTask(ReplicationTask rtask) throws IOException { + assertEquals(CreateTableReplicationTask.class, rtask.getClass()); + assertEquals(true, rtask.needsStagingDirs()); + assertEquals(false, rtask.isActionable()); + + rtask + .withSrcStagingDirProvider(stagingDirectoryProvider) + .withDstStagingDirProvider(stagingDirectoryProvider) + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + assertEquals(true, rtask.isActionable()); + + List srcCommands = Lists.newArrayList(rtask.getSrcWhCommands()); + assertEquals(1,srcCommands.size()); + assertEquals(ExportCommand.class, srcCommands.get(0).getClass()); + + ExportCommand exportCommand = getExpectedExportCommand(rtask, null,false); + + assertEquals(ReplicationUtils.serializeCommand(exportCommand), + ReplicationUtils.serializeCommand(srcCommands.get(0))); + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(1,dstCommands.size()); + assertEquals(ImportCommand.class, dstCommands.get(0).getClass()); + + ImportCommand importCommand = getExpectedImportCommand(rtask,null,false); + + assertEquals(ReplicationUtils.serializeCommand(importCommand), + ReplicationUtils.serializeCommand(dstCommands.get(0))); + } + + @Test + public void testDropTable() throws IOException { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_DROP_TABLE_EVENT, msgFactory.buildDropTableMessage(t).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(client,hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyDropTableReplicationTask(rtask); + } + + private static void verifyDropTableReplicationTask(ReplicationTask rtask) throws IOException { + assertEquals(DropTableReplicationTask.class, rtask.getClass()); + assertEquals(false, rtask.needsStagingDirs()); + assertEquals(true, rtask.isActionable()); + + rtask + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + for (Command c : rtask.getSrcWhCommands()){ + assertEquals(NoopCommand.class, c.getClass()); + } + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(1,dstCommands.size()); + assertEquals(DropTableCommand.class, dstCommands.get(0).getClass()); + + DropTableCommand dropTableCommand = new DropTableCommand( + debugMapping.apply(rtask.getEvent().getDbName()), + debugMapping.apply(rtask.getEvent().getTableName()), + true, + rtask.getEvent().getEventId()); + + assertEquals(ReplicationUtils.serializeCommand(dropTableCommand), + ReplicationUtils.serializeCommand(dstCommands.get(0))); + } + + @Test + public void testAlterTable() throws IOException { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_ALTER_TABLE_EVENT, msgFactory.buildAlterTableMessage(t, t).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(client,hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyAlterTableReplicationTask(rtask); + } + + private static void verifyAlterTableReplicationTask(ReplicationTask rtask) throws IOException { + assertEquals(AlterTableReplicationTask.class, rtask.getClass()); + assertEquals(true, rtask.needsStagingDirs()); + assertEquals(false, rtask.isActionable()); + + rtask + .withSrcStagingDirProvider(stagingDirectoryProvider) + .withDstStagingDirProvider(stagingDirectoryProvider) + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + assertEquals(true, rtask.isActionable()); + + List srcCommands = Lists.newArrayList(rtask.getSrcWhCommands()); + assertEquals(1, srcCommands.size()); + assertEquals(ExportCommand.class, srcCommands.get(0).getClass()); + + ExportCommand exportCommand = getExpectedExportCommand(rtask, null, true); + + assertEquals(ReplicationUtils.serializeCommand(exportCommand), + ReplicationUtils.serializeCommand(srcCommands.get(0))); + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(1,dstCommands.size()); + assertEquals(ImportCommand.class, dstCommands.get(0).getClass()); + + ImportCommand importCommand = getExpectedImportCommand(rtask, null, true); + + assertEquals(ReplicationUtils.serializeCommand(importCommand), + ReplicationUtils.serializeCommand(dstCommands.get(0))); + } + + @Test + public void testAddPartition() throws IOException { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + List pkeys = HCatSchemaUtils.getFieldSchemas( + HCatSchemaUtils.getHCatSchema("a:int,b:string").getFields()); + t.setPartitionKeys(pkeys); + + List addedPtns = new ArrayList(); + addedPtns.add(createPtn(t, Arrays.asList("120", "abc"))); + addedPtns.add(createPtn(t, Arrays.asList("201", "xyz"))); + + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_ADD_PARTITION_EVENT, msgFactory.buildAddPartitionMessage(t, addedPtns.iterator()).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(client,hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyAddPartitionReplicationTask(rtask, t, addedPtns); + + } + + private static void verifyAddPartitionReplicationTask(ReplicationTask rtask, Table table, List addedPtns) throws IOException { + assertEquals(AddPartitionReplicationTask.class, rtask.getClass()); + assertEquals(true, rtask.needsStagingDirs()); + assertEquals(false,rtask.isActionable()); + + rtask + .withSrcStagingDirProvider(stagingDirectoryProvider) + .withDstStagingDirProvider(stagingDirectoryProvider) + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + assertEquals(true,rtask.isActionable()); + + List srcCommands = Lists.newArrayList(rtask.getSrcWhCommands()); + assertEquals(2,srcCommands.size()); + assertEquals(ExportCommand.class, srcCommands.get(0).getClass()); + assertEquals(ExportCommand.class, srcCommands.get(1).getClass()); + + ExportCommand exportCommand1 = getExpectedExportCommand(rtask, getPtnDesc(table,addedPtns.get(0)), false); + ExportCommand exportCommand2 = getExpectedExportCommand(rtask, getPtnDesc(table,addedPtns.get(1)), false); + + CommandTestUtils.compareCommands(exportCommand1, srcCommands.get(0), true); + CommandTestUtils.compareCommands(exportCommand2, srcCommands.get(1), true); + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(2,dstCommands.size()); + assertEquals(ImportCommand.class, dstCommands.get(0).getClass()); + assertEquals(ImportCommand.class, dstCommands.get(1).getClass()); + + ImportCommand importCommand1 = getExpectedImportCommand(rtask, getPtnDesc(table,addedPtns.get(0)), false); + ImportCommand importCommand2 = getExpectedImportCommand(rtask, getPtnDesc(table,addedPtns.get(1)), false); + + CommandTestUtils.compareCommands(importCommand1, dstCommands.get(0), true); + CommandTestUtils.compareCommands(importCommand2, dstCommands.get(1), true); + } + + private static Map getPtnDesc(Table t, Partition p) { + assertEquals(t.getPartitionKeysSize(),p.getValuesSize()); + Map retval = new HashMap(); + Iterator pval = p.getValuesIterator(); + for (FieldSchema fs : t.getPartitionKeys()){ + retval.put(fs.getName(),pval.next()); + } + return retval; + } + + private static Partition createPtn(Table t, List pvals) { + Partition ptn = new Partition(); + ptn.setDbName(t.getDbName()); + ptn.setTableName(t.getTableName()); + ptn.setValues(pvals); + return ptn; + } + + @Test + public void testDropPartition() throws HCatException { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + List pkeys = HCatSchemaUtils.getFieldSchemas( + HCatSchemaUtils.getHCatSchema("a:int,b:string").getFields()); + t.setPartitionKeys(pkeys); + + Partition p = createPtn(t, Arrays.asList("102", "lmn")); + + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_DROP_PARTITION_EVENT, msgFactory.buildDropPartitionMessage(t,p).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(client,hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyDropPartitionReplicationTask(rtask, t, p); + } + + private static void verifyDropPartitionReplicationTask(ReplicationTask rtask, Table table, Partition ptn) { + assertEquals(DropPartitionReplicationTask.class, rtask.getClass()); + assertEquals(false, rtask.needsStagingDirs()); + assertEquals(true, rtask.isActionable()); + + rtask + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + assertEquals(true,rtask.isActionable()); + + for (Command c : rtask.getSrcWhCommands()){ + assertEquals(NoopCommand.class, c.getClass()); + } + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(1,dstCommands.size()); + assertEquals(DropPartitionCommand.class, dstCommands.get(0).getClass()); + + DropPartitionCommand dropPartitionCommand = new DropPartitionCommand( + debugMapping.apply(rtask.getEvent().getDbName()), + debugMapping.apply(rtask.getEvent().getTableName()), + getPtnDesc(table,ptn), + true, + rtask.getEvent().getEventId() + ); + + CommandTestUtils.compareCommands(dropPartitionCommand, dstCommands.get(0), true); + } + + @Test + public void testAlterPartition() throws HCatException { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + List pkeys = HCatSchemaUtils.getFieldSchemas( + HCatSchemaUtils.getHCatSchema("a:int,b:string").getFields()); + t.setPartitionKeys(pkeys); + Partition p = createPtn(t, Arrays.asList("102", "lmn")); + + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_ALTER_PARTITION_EVENT, msgFactory.buildAlterPartitionMessage(t,p,p).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(client,hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyAlterPartitionReplicationTask(rtask, t, p); + } + + private static void verifyAlterPartitionReplicationTask(ReplicationTask rtask, Table table, Partition ptn) { + assertEquals(AlterPartitionReplicationTask.class, rtask.getClass()); + assertEquals(true, rtask.needsStagingDirs()); + assertEquals(false,rtask.isActionable()); + + rtask + .withSrcStagingDirProvider(stagingDirectoryProvider) + .withDstStagingDirProvider(stagingDirectoryProvider) + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + assertEquals(true,rtask.isActionable()); + + List srcCommands = Lists.newArrayList(rtask.getSrcWhCommands()); + assertEquals(1,srcCommands.size()); + assertEquals(ExportCommand.class, srcCommands.get(0).getClass()); + + ExportCommand exportCommand = getExpectedExportCommand(rtask, getPtnDesc(table, ptn), true); + CommandTestUtils.compareCommands(exportCommand, srcCommands.get(0), true); + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(1,dstCommands.size()); + assertEquals(ImportCommand.class, dstCommands.get(0).getClass()); + + ImportCommand importCommand = getExpectedImportCommand(rtask, getPtnDesc(table, ptn), true); + CommandTestUtils.compareCommands(importCommand, dstCommands.get(0), true); + } + + @Test + public void testInsert() throws HCatException { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + List pkeys = HCatSchemaUtils.getFieldSchemas( + HCatSchemaUtils.getHCatSchema("a:int,b:string").getFields()); + t.setPartitionKeys(pkeys); + Partition p = createPtn(t, Arrays.asList("102", "lmn")); + List files = Arrays.asList("/tmp/test123"); + + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_INSERT_EVENT, msgFactory.buildInsertMessage( + t.getDbName(), + t.getTableName(), + getPtnDesc(t,p), + files + ).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(client,hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyInsertReplicationTask(rtask, t, p); + } + + private static void verifyInsertReplicationTask(ReplicationTask rtask, Table table, Partition ptn) { + assertEquals(InsertReplicationTask.class, rtask.getClass()); + assertEquals(true, rtask.needsStagingDirs()); + assertEquals(false,rtask.isActionable()); + + rtask + .withSrcStagingDirProvider(stagingDirectoryProvider) + .withDstStagingDirProvider(stagingDirectoryProvider) + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + assertEquals(true,rtask.isActionable()); + + List srcCommands = Lists.newArrayList(rtask.getSrcWhCommands()); + assertEquals(1,srcCommands.size()); + assertEquals(ExportCommand.class, srcCommands.get(0).getClass()); + + ExportCommand exportCommand = getExpectedExportCommand(rtask, getPtnDesc(table, ptn), false); + CommandTestUtils.compareCommands(exportCommand, srcCommands.get(0), true); + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(1,dstCommands.size()); + assertEquals(ImportCommand.class, dstCommands.get(0).getClass()); + + ImportCommand importCommand = getExpectedImportCommand(rtask, getPtnDesc(table, ptn), false); + CommandTestUtils.compareCommands(importCommand, dstCommands.get(0), true); + } + + private static long getEventId() { + // Does not need to be unique, just non-zero distinct value to test against. + return 42; + } + + private static int getTime() { + // Does not need to be actual time, just non-zero distinct value to test against. + return 1729; + } + + private static ImportCommand getExpectedImportCommand(ReplicationTask rtask, Map ptnDesc, boolean isMetadataOnly) { + String dbName = rtask.getEvent().getDbName(); + String tableName = rtask.getEvent().getTableName(); + return new ImportCommand( + ReplicationUtils.mapIfMapAvailable(dbName, debugMapping), + ReplicationUtils.mapIfMapAvailable(tableName, debugMapping), + ptnDesc, + stagingDirectoryProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + rtask.getEvent().getEventId(), + dbName, + tableName, + ptnDesc) + ), + isMetadataOnly, + rtask.getEvent().getEventId() + ); + } + + private static ExportCommand getExpectedExportCommand(ReplicationTask rtask, Map ptnDesc, boolean isMetadataOnly) { + String dbName = rtask.getEvent().getDbName(); + String tableName = rtask.getEvent().getTableName(); + return new ExportCommand( + dbName, + tableName, + ptnDesc, + stagingDirectoryProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + rtask.getEvent().getEventId(), + dbName, + tableName, + ptnDesc) + ), + isMetadataOnly, + rtask.getEvent().getEventId() + ); + } + +} diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 8ef5394..91cc03e 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -263,9 +263,10 @@ public void alterPartition() throws Exception { assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("alterparttable", event.getTableName()); - assertTrue(event.getMessage().matches( "\\{\"eventType\":\"ALTER_PARTITION\",\"server\":\"\"," + + assertTrue(event.getMessage(), + event.getMessage().matches("\\{\"eventType\":\"ALTER_PARTITION\",\"server\":\"\"," + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alterparttable\"," + - "\"timestamp\":[0-9]+,\"values\":\\[\"today\"]}")); + "\"timestamp\":[0-9]+,\"keyValues\":\\{\"ds\":\"today\"}}")); } @Test @@ -303,55 +304,81 @@ public void dropPartition() throws Exception { @Test public void insertTable() throws Exception { + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "int", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, emptyParameters); + Table table = new Table("insertTable", "default", "me", startTime, startTime, 0, sd, null, + emptyParameters, null, null, null); + msClient.createTable(table); + FireEventRequestData data = new FireEventRequestData(); InsertEventRequestData insertData = new InsertEventRequestData(); data.setInsertData(insertData); insertData.addToFilesAdded("/warehouse/mytable/b1"); FireEventRequest rqst = new FireEventRequest(true, data); - rqst.setDbName("mydb"); - rqst.setTableName("mytable"); + rqst.setDbName("default"); + rqst.setTableName("insertTable"); msClient.fireListenerEvent(rqst); NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); - assertEquals(1, rsp.getEventsSize()); + assertEquals(2, rsp.getEventsSize()); - NotificationEvent event = rsp.getEvents().get(0); - assertEquals(firstEventId + 1, event.getEventId()); + NotificationEvent event = rsp.getEvents().get(1); + assertEquals(firstEventId + 2, event.getEventId()); assertTrue(event.getEventTime() >= startTime); assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType()); - assertEquals("mydb", event.getDbName()); - assertEquals("mytable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"INSERT\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"mydb\",\"table\":" + - "\"mytable\",\"timestamp\":[0-9]+,\"partitionValues\":null," + - "\"files\":\\[\"/warehouse/mytable/b1\"]}")); + assertEquals("default", event.getDbName()); + assertEquals("insertTable", event.getTableName()); + assertTrue(event.getMessage(), + event.getMessage().matches("\\{\"eventType\":\"INSERT\",\"server\":\"\"," + + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + + "\"insertTable\",\"timestamp\":[0-9]+,\"files\":\\[\"/warehouse/mytable/b1\"]," + + "\"partKeyVals\":\\{},\"partitionKeyValues\":\\{}}")); } @Test public void insertPartition() throws Exception { + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "int", "nocomment")); + List partCols = new ArrayList(); + partCols.add(new FieldSchema("ds", "string", "")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, emptyParameters); + Table table = new Table("insertPartition", "default", "me", startTime, startTime, 0, sd, + partCols, emptyParameters, null, null, null); + msClient.createTable(table); + Partition partition = new Partition(Arrays.asList("today"), "default", "insertPartition", + startTime, startTime, sd, emptyParameters); + msClient.add_partition(partition); + FireEventRequestData data = new FireEventRequestData(); InsertEventRequestData insertData = new InsertEventRequestData(); data.setInsertData(insertData); insertData.addToFilesAdded("/warehouse/mytable/today/b1"); FireEventRequest rqst = new FireEventRequest(true, data); - rqst.setDbName("mydb"); - rqst.setTableName("mytable"); + rqst.setDbName("default"); + rqst.setTableName("insertPartition"); rqst.setPartitionVals(Arrays.asList("today")); msClient.fireListenerEvent(rqst); NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); - assertEquals(1, rsp.getEventsSize()); + assertEquals(3, rsp.getEventsSize()); - NotificationEvent event = rsp.getEvents().get(0); - assertEquals(firstEventId + 1, event.getEventId()); + NotificationEvent event = rsp.getEvents().get(2); + assertEquals(firstEventId + 3, event.getEventId()); assertTrue(event.getEventTime() >= startTime); assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType()); - assertEquals("mydb", event.getDbName()); - assertEquals("mytable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"INSERT\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"mydb\",\"table\":" + - "\"mytable\",\"timestamp\":[0-9]+,\"partitionValues\":\\[\"today\"]," + - "\"files\":\\[\"/warehouse/mytable/today/b1\"]}")); + assertEquals("default", event.getDbName()); + assertEquals("insertPartition", event.getTableName()); + assertTrue(event.getMessage(), + event.getMessage().matches("\\{\"eventType\":\"INSERT\",\"server\":\"\"," + + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + + "\"insertPartition\",\"timestamp\":[0-9]+," + + "\"files\":\\[\"/warehouse/mytable/today/b1\"],\"partKeyVals\":\\{\"ds\":\"today\"}," + + "\"partitionKeyValues\":\\{\"ds\":\"today\"}}")); } @Test diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java index be66cbe..2a313b0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java @@ -19,13 +19,14 @@ package org.apache.hadoop.hive.metastore.events; import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; -import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; -import java.util.Arrays; -import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; public class InsertEvent extends ListenerEvent { @@ -33,24 +34,30 @@ // we have just the string names, but that's fine for what we need. private final String db; private final String table; - private final List partVals; + private final Map keyValues; private final List files; /** * * @param db name of the database the table is in * @param table name of the table being inserted into - * @param partitions list of partition values, can be null + * @param partVals list of partition values, can be null * @param status status of insert, true = success, false = failure * @param handler handler that is firing the event */ - public InsertEvent(String db, String table, List partitions, List files, - boolean status, HMSHandler handler) { + public InsertEvent(String db, String table, List partVals, List files, + boolean status, HMSHandler handler) throws MetaException, NoSuchObjectException { super(status, handler); this.db = db; this.table = table; - this.partVals = partitions; this.files = files; + Table t = handler.get_table(db,table); + keyValues = new LinkedHashMap(); + if (partVals != null) { + for (int i = 0; i < partVals.size(); i++) { + keyValues.put(t.getPartitionKeys().get(i).getName(), partVals.get(i)); + } + } } public String getDb() { @@ -64,10 +71,10 @@ public String getTable() { } /** - * @return List of partitions. + * @return List of values for the partition keys. */ - public List getPartitions() { - return partVals; + public Map getPartitionKeyValues() { + return keyValues; } /**