diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index fb23c40..81938af 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1589,6 +1589,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "imported on to tables that are the target of replication. If this parameter is\n" + "set, regular imports will check if the destination table(if it exists) has a " + "'repl.last.id' set on it. If so, it will fail."), + HIVE_REPL_TASK_FACTORY("hive.repl.task.factory","", + "Parameter that can be used to override which ReplicationTaskFactory will be\n" + + "used to instantiate ReplicationTask events. Override for third party repl plugins"), HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false, ""), HIVE_REWORK_MAPREDWORK("hive.rework.mapredwork", false, "should rework the mapred work or not.\n" + diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 358a882..81cc8c4 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -187,7 +187,7 @@ public void onAlterPartition (AlterPartitionEvent partitionEvent) throws MetaEx Partition after = partitionEvent.getNewPartition(); NotificationEvent event = new NotificationEvent(0, now(), HCatConstants.HCAT_ALTER_PARTITION_EVENT, - msgFactory.buildAlterPartitionMessage(before, after).toString()); + msgFactory.buildAlterPartitionMessage(partitionEvent.getTable(),before, after).toString()); event.setDbName(before.getDbName()); event.setTableName(before.getTableName()); enqueue(event); @@ -223,7 +223,7 @@ public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException { public void onInsert(InsertEvent insertEvent) throws MetaException { NotificationEvent event = new NotificationEvent(0, now(), HCatConstants.HCAT_INSERT_EVENT, msgFactory.buildInsertMessage(insertEvent.getDb(), insertEvent.getTable(), - insertEvent.getPartitions(), insertEvent.getFiles()).toString()); + insertEvent.getPartitionKeyValues(), insertEvent.getFiles()).toString()); event.setDbName(insertEvent.getDb()); event.setTableName(insertEvent.getTable()); enqueue(event); diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java index 1718d79..da3d4da 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java @@ -55,11 +55,8 @@ import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; -import org.apache.hadoop.hive.metastore.events.InsertEvent; -import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.apache.hive.hcatalog.common.HCatConstants; -import org.apache.hive.hcatalog.messaging.AlterTableMessage; import org.apache.hive.hcatalog.messaging.HCatEventMessage; import org.apache.hive.hcatalog.messaging.MessageFactory; import org.slf4j.Logger; @@ -150,7 +147,7 @@ public void onAlterPartition(AlterPartitionEvent ape) throws MetaException { Partition after = ape.getNewPartition(); String topicName = getTopicName(ape.getTable()); - send(messageFactory.buildAlterPartitionMessage(before, after), topicName); + send(messageFactory.buildAlterPartitionMessage(ape.getTable(),before, after), topicName); } } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java index 7412b60..10a300d 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java @@ -18,7 +18,7 @@ */ package org.apache.hive.hcatalog.messaging; -import java.util.List; +import java.util.Map; /** * HCat message sent when a table is Altered. @@ -31,12 +31,12 @@ protected AlterPartitionMessage() { public abstract String getTable(); - public abstract List getValues(); + public abstract Map getKeyValues(); @Override public HCatEventMessage checkValid() { if (getTable() == null) throw new IllegalStateException("Table name unset."); - if (getValues() == null) throw new IllegalStateException("Partition values unset"); + if (getKeyValues() == null) throw new IllegalStateException("Partition values unset"); return super.checkValid(); } } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java index b25da29..be7ea10 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java @@ -20,6 +20,7 @@ package org.apache.hive.hcatalog.messaging; import java.util.List; +import java.util.Map; /** * HCat message sent when an insert is done to a table or partition. @@ -37,11 +38,11 @@ protected InsertMessage() { public abstract String getTable(); /** - * Get the list of partition values. Will be null if this insert is to a table and not a + * Get the map of partition keyvalues. Will be null if this insert is to a table and not a * partition. - * @return List of partition values, or null. + * @return Map of partition keyvalues, or null. */ - public abstract List getPartitionValues(); + public abstract Map getPartitionKeyValues(); /** * Get the list of files created as a result of this DML operation. May be null. diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java index 2b16745..224c61e 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; +import java.util.Map; /** * Abstract Factory for the construction of HCatalog message instances. @@ -149,7 +150,7 @@ public static MessageDeserializer getDeserializer(String format, * @param after The partition after it was altered * @return a new AlterPartitionMessage */ - public abstract AlterPartitionMessage buildAlterPartitionMessage(Partition before, + public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, Partition after); /** @@ -170,5 +171,5 @@ public abstract AlterPartitionMessage buildAlterPartitionMessage(Partition befor * @return instance of InsertMessage */ public abstract InsertMessage buildInsertMessage(String db, String table, - List partVals, List files); + Map partVals, List files); } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java index 1e2456d..4f1d104 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java @@ -40,21 +40,25 @@ Long timestamp; @JsonProperty - List values; + Map keyValues; + /** + * Default constructor, needed for Jackson. + */ + public JSONAlterPartitionMessage() {} public JSONAlterPartitionMessage(String server, String servicePrincipal, String db, String table, - List values, + Map keyValues, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; this.table = table; this.timestamp = timestamp; - this.values = values; + this.keyValues = keyValues; checkValid(); } @@ -85,8 +89,8 @@ public String getTable() { } @Override - public List getValues() { - return values; + public Map getKeyValues() { + return keyValues; } @Override diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java index 2848843..b057d4a 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java @@ -37,6 +37,11 @@ @JsonProperty Long timestamp; + /** + * Default constructor, needed for Jackson. + */ + public JSONAlterTableMessage() {} + public JSONAlterTableMessage(String server, String servicePrincipal, String db, diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java index f1554e3..8a4db15 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java @@ -19,11 +19,11 @@ package org.apache.hive.hcatalog.messaging.json; -import org.apache.hive.hcatalog.messaging.DropTableMessage; import org.apache.hive.hcatalog.messaging.InsertMessage; import org.codehaus.jackson.annotate.JsonProperty; import java.util.List; +import java.util.Map; /** * JSON implementation of DropTableMessage. @@ -37,7 +37,10 @@ Long timestamp; @JsonProperty - List partitionValues, files; + List files; + + @JsonProperty + Map partKeyVals; /** * Default constructor, needed for Jackson. @@ -45,13 +48,13 @@ public JSONInsertMessage() {} public JSONInsertMessage(String server, String servicePrincipal, String db, String table, - List partVals, List files, Long timestamp) { + Map partKeyVals, List files, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; this.table = table; this.timestamp = timestamp; - partitionValues = partVals; + this.partKeyVals = partKeyVals; this.files = files; checkValid(); } @@ -64,8 +67,8 @@ public JSONInsertMessage(String server, String servicePrincipal, String db, Stri public String getServer() { return server; } @Override - public List getPartitionValues() { - return partitionValues; + public Map getPartitionKeyValues() { + return partKeyVals; } @Override diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java index 06efb89..954cd3a 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java @@ -109,9 +109,9 @@ public AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partVals, + public InsertMessage buildInsertMessage(String db, String table, Map partKeyVals, List files) { - return new JSONInsertMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db, table, partVals, + return new JSONInsertMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db, table, partKeyVals, files, now()); } diff --git a/hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java b/hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java index bf61dcf..827031a 100644 --- a/hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java +++ b/hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java @@ -182,14 +182,14 @@ public void onMessage(Message msg) { AlterPartitionMessage message = deserializer.getAlterPartitionMessage(messageBody); Assert.assertEquals("mytbl", message.getTable()); Assert.assertEquals("mydb", message.getDB()); - Assert.assertEquals(1, message.getValues().size()); - Assert.assertEquals("2011", message.getValues().get(0)); + Assert.assertEquals(1, message.getKeyValues().size()); + Assert.assertTrue(message.getKeyValues().values().contains("2011")); HCatEventMessage message2 = MessagingUtils.getMessage(msg); Assert.assertTrue("Unexpected message-type.", message2 instanceof AlterPartitionMessage); Assert.assertEquals("mydb", message2.getDB()); Assert.assertEquals("mytbl", ((AlterPartitionMessage) message2).getTable()); - Assert.assertEquals(1, ((AlterPartitionMessage) message2).getValues().size()); - Assert.assertEquals("2011", ((AlterPartitionMessage) message2).getValues().get(0)); + Assert.assertEquals(1, ((AlterPartitionMessage) message2).getKeyValues().size()); + Assert.assertTrue(((AlterPartitionMessage) message2).getKeyValues().values().contains("2011")); } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java index 2f830fd..0cc6117 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java @@ -34,7 +34,7 @@ public enum Scope { DB, TABLE, UNKNOWN }; - HCatNotificationEvent(NotificationEvent event) { + public HCatNotificationEvent(NotificationEvent event) { eventId = event.getEventId(); eventTime = event.getEventTime(); eventType = event.getEventType(); diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartition.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartition.java index 0d714ff..ed40af7 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartition.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartition.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.api; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -259,6 +260,17 @@ public String getSerDe() { return this.sd.getSerdeInfo().getParameters(); } + public HCatPartition parameters(Map parameters){ + if (this.parameters == null){ + this.parameters = new HashMap(); + } + if (!this.parameters.equals(parameters)) { + this.parameters.clear(); + this.parameters.putAll(parameters); + } + return this; + } + public Map getParameters() { return this.parameters; } diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java index 00f6d4e..bb9ff15 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java @@ -26,12 +26,11 @@ import java.util.List; /** - * This class is there to help testing, and to help initial development - * and will be the default Replication Task for under-development replication - * tasks to override. + * Noop replication task - a replication task that is actionable, + * does not need any further info, and returns NoopCommands. * - * This is not intended to be a permanent class, and will likely move to the test - * package after initial implementation. + * Useful for testing, and also for tasks that need to be represented + * but actually do nothing. */ public class NoopReplicationTask extends ReplicationTask { diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java index 4bd3e81..8180c92 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.api.repl; import com.google.common.base.Function; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.hcatalog.api.HCatNotificationEvent; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.messaging.MessageFactory; @@ -36,6 +37,8 @@ protected Function dbNameMapping = null; protected static MessageFactory messageFactory = MessageFactory.getInstance(); + private static Factory factoryInstance = null; + private static String factoryClassName = null; public interface Factory { public ReplicationTask create(HCatNotificationEvent event); @@ -77,20 +80,56 @@ public ReplicationTask create(HCatNotificationEvent event) { } } - private static Factory factoryInstance = null; + /** + * Returns factory instance for instantiating ReplicationTasks. + * + * The order precedence is as follows: + * + * a) If a factory has already been instantiated, and is valid, use it. + * b) If a factoryClassName has been provided, through .resetFactory(), attempt to instantiate that. + * Throw an exception if instantiation fails. (This is useful for testing) + * c) If a hive.repl.task.factory has been set in the default hive conf, use that. Throw an + * exception if instantiation fails. + * d) Default to NoopFactory. + */ private static Factory getFactoryInstance() { if (factoryInstance == null){ - // TODO: Eventually, we'll have a bit here that looks at a config param to instantiate - // the appropriate factory, with EXIMFactory being the default - that allows - // others to implement their own ReplicationTask.Factory for other replication - // implementations. - // That addition will be brought in by the EXIMFactory patch. - factoryInstance = new NoopFactory(); + // instantiate new factory instance only if current one is not valid. + if (factoryClassName == null){ + // figure out which factory we're instantiating from HiveConf iff it's not been set on us directly. + factoryClassName = HiveConf.getVar(new HiveConf(), HiveConf.ConfVars.HIVE_REPL_TASK_FACTORY); + } + if ((factoryClassName != null) && (!factoryClassName.isEmpty())){ + try { + Class factoryClass = (Class) Class.forName(factoryClassName); + factoryInstance = factoryClass.newInstance(); + } catch (Exception e) { + factoryClassName = null; // reset the classname for future evaluations. + throw new RuntimeException("Error instantiating ReplicationTask.Factory " + + HiveConf.ConfVars.HIVE_REPL_TASK_FACTORY.varname+"="+factoryClassName); + } + } else { + // default to NoopFactory. + factoryInstance = new NoopFactory(); + } } return factoryInstance; } /** + * Package scoped method used for testing - allows resetting the ReplicationTaskFactory used + * @param factoryClass The new ReplicationTaskFactory to use. + */ + public static void resetFactory(Class factoryClass) { + if (factoryClass != null){ + factoryClassName = factoryClass.getName(); + } else { + factoryClassName = null; + } + factoryInstance = null; + } + + /** * Factory method to return appropriate subtype of ReplicationTask for given event * @param event HCatEventMessage returned by the notification subsystem * @return corresponding ReplicationTask @@ -168,7 +207,7 @@ public ReplicationTask withDstStagingDirProvider(StagingDirectoryProvider dstSta * That way, the default will then be that the destination db name is the same as the src db name * * If you want to use a Map mapping instead of a Function, - * simply call this function as .withTableNameMapping(com.google.common.base.Functions.forMap(tableMap)) + * simply call this function as .withTableNameMapping(ReplicationUtils.mapBasedFunction(tableMap)) * @param tableNameMapping * @return this replication task */ @@ -185,7 +224,7 @@ public ReplicationTask withTableNameMapping(Function tableNameMap * That way, the default will then be that the destination db name is the same as the src db name * * If you want to use a Map mapping instead of a Function, - * simply call this function as .withDb(com.google.common.base.Functions.forMap(dbMap)) + * simply call this function as .withDbNameMapping(ReplicationUtils.mapBasedFunction(dbMap)) * @param dbNameMapping * @return this replication task */ diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java index 299a25d..1e7901d 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java @@ -41,7 +41,7 @@ public class ReplicationUtils { - private final static String REPL_STATE_ID = ReplicationSpec.KEY.CURR_STATE_ID.toString(); + public final static String REPL_STATE_ID = ReplicationSpec.KEY.CURR_STATE_ID.toString(); private ReplicationUtils(){ // dummy private constructor, since this class is a collection of static utility methods. @@ -155,13 +155,32 @@ public static String toStringWordCharsOnly(String s){ } /** + * Utility function to use in conjunction with .withDbNameMapping / .withTableNameMapping, + * if we desire usage of a Map instead of implementing a Function + */ + Function mapBasedFunction(final Map m){ + return new Function(){ + + @Nullable + @Override + public String apply(@Nullable String s) { + if ((m == null) || (!m.containsKey(s))){ + return s; + } + return m.get(s); + } + }; + } + + /** * Return a mapping from a given map function if available, and the key itself if not. */ public static String mapIfMapAvailable(String s, Function mapping){ try { return mapping.apply(s); } catch (IllegalArgumentException iae){ - // The key wasn't present in the mapping, return the key itself, since no mapping was available + // The key wasn't present in the mapping, and the function didn't take care of returning + // a default value. We return the key itself, since no mapping was available return s; } } diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropDatabaseCommand.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropDatabaseCommand.java new file mode 100644 index 0000000..e40d4d3 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropDatabaseCommand.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.commands; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.ReaderWriter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class DropDatabaseCommand extends HiveCommand { + private String dbName = null; + private long eventId; + + public DropDatabaseCommand(String dbName, long eventId) { + this.dbName = dbName; + this.eventId = eventId; + } + + public DropDatabaseCommand(){ + // trivial ctor to support Writable reflections instantiation + // do not expect to use this object as-is, unless you call + // readFields after using this ctor + } + + @Override + public List get() { + // DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT|CASCADE]; + StringBuilder sb = new StringBuilder(); + sb.append("DROP DATABASE IF EXISTS "); + sb.append(dbName); + sb.append(" CASCADE"); + return Arrays.asList(sb.toString()); + } + + @Override + public boolean isRetriable() { + return true; + } + + @Override + public boolean isUndoable() { + return false; + } + + @Override + public List getUndo() { + throw new UnsupportedOperationException("getUndo called on command that does returned false for isUndoable"); + } + + @Override + public List cleanupLocationsPerRetry() { + return new ArrayList(); + } + + @Override + public List cleanupLocationsAfterEvent() { + return new ArrayList(); + } + + @Override + public long getEventId() { + return eventId; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + ReaderWriter.writeDatum(dataOutput, dbName); + ReaderWriter.writeDatum(dataOutput, Long.valueOf(eventId)); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + dbName = (String)ReaderWriter.readDatum(dataInput); + eventId = ((Long)ReaderWriter.readDatum(dataInput)).longValue(); + } + + @Override + void run(HCatClient client, Configuration conf) throws HCatException { + client.dropDatabase(this.dbName,true, HCatClient.DropDBMode.CASCADE); + } + + @Override + boolean isRunnableFromHCatClient() { + return true; + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropPartitionCommand.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropPartitionCommand.java new file mode 100644 index 0000000..c3afec9 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropPartitionCommand.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.commands; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.ReaderWriter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class DropPartitionCommand extends HiveCommand { + private long eventId; + private String dbName; + private String tableName; + private Map ptnDesc; + private boolean isReplicatedEvent = false; + + public DropPartitionCommand(String dbName, String tableName, Map ptnDesc, boolean isReplicatedEvent, long eventId) { + this.dbName = dbName; + this.tableName = tableName; + this.ptnDesc = ptnDesc; + this.isReplicatedEvent = isReplicatedEvent; + this.eventId = eventId; + } + + public DropPartitionCommand(){ + // trivial ctor to support Writable reflections instantiation + // do not expect to use this object as-is, unless you call + // readFields after using this ctor + } + + @Override + public List get() { + // ALTER TABLE table_name DROP [IF EXISTS] PARTITION partition_spec, PARTITION partition_spec,...; + StringBuilder sb = new StringBuilder(); + sb.append("ALTER TABLE "); + sb.append(dbName); + sb.append('.'); + sb.append(tableName); + sb.append(" DROP IF EXISTS"); + sb.append(ReplicationUtils.partitionDescriptor(ptnDesc)); + if (isReplicatedEvent){ + sb.append(" FOR REPLICATION(\'"); + sb.append(eventId); + sb.append("\')"); + } + return Arrays.asList(sb.toString()); + } + + @Override + public boolean isRetriable() { + return true; + } + + @Override + public boolean isUndoable() { + return false; + } + + @Override + public List getUndo() { + throw new UnsupportedOperationException("getUndo called on command that does returned false for isUndoable"); + } + + @Override + public List cleanupLocationsPerRetry() { + return new ArrayList(); + } + + @Override + public List cleanupLocationsAfterEvent() { + return new ArrayList(); + } + + @Override + public long getEventId() { + return eventId; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + ReaderWriter.writeDatum(dataOutput, dbName); + ReaderWriter.writeDatum(dataOutput, tableName); + ReaderWriter.writeDatum(dataOutput, ptnDesc); + ReaderWriter.writeDatum(dataOutput, Boolean.valueOf(isReplicatedEvent)); + ReaderWriter.writeDatum(dataOutput, Long.valueOf(eventId)); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + dbName = (String)ReaderWriter.readDatum(dataInput); + tableName = (String)ReaderWriter.readDatum(dataInput); + ptnDesc = (Map)ReaderWriter.readDatum(dataInput); + isReplicatedEvent = ((Boolean)ReaderWriter.readDatum(dataInput)).booleanValue(); + eventId = ((Long)ReaderWriter.readDatum(dataInput)).longValue(); + } + + @Override + void run(HCatClient client, Configuration conf) throws HCatException { + client.dropPartitions(dbName,tableName,ptnDesc,true); // No support for .. FOR REPLICATION semantics form HCatClient yet. + } + + @Override + boolean isRunnableFromHCatClient() { + return !isReplicatedEvent; // No support for .. FOR REPLICATION semantics form HCatClient yet. + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropTableCommand.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropTableCommand.java new file mode 100644 index 0000000..b04040b --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/DropTableCommand.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.commands; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.ReaderWriter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class DropTableCommand extends HiveCommand { + private long eventId; + private String dbName = null; + private String tableName = null; + private boolean isReplicatedEvent = false; + + public DropTableCommand(String dbName, String tableName, boolean isReplicatedEvent, long eventId) { + this.dbName = dbName; + this.tableName = tableName; + this.isReplicatedEvent = isReplicatedEvent; + this.eventId = eventId; + } + + public DropTableCommand() { + // trivial ctor to support Writable reflections instantiation + // do not expect to use this object as-is, unless you call + // readFields after using this ctor + } + + @Override + public List get() { + // DROP TABLE [IF EXISTS] table_name; + StringBuilder sb = new StringBuilder(); + sb.append("DROP TABLE IF EXISTS "); + sb.append(dbName); + sb.append('.'); + sb.append(tableName); // TODO: Handle quoted tablenames + if (isReplicatedEvent){ + sb.append(" FOR REPLICATION(\'"); + sb.append(eventId); + sb.append("\')"); + } + return Arrays.asList(sb.toString()); + } + + @Override + public boolean isRetriable() { + return true; + } + + @Override + public boolean isUndoable() { + return false; + } + + @Override + public List getUndo() { + throw new UnsupportedOperationException("getUndo called on command that does returned false for isUndoable"); + } + + @Override + public List cleanupLocationsPerRetry() { + return new ArrayList(); + } + + @Override + public List cleanupLocationsAfterEvent() { + return new ArrayList(); + } + + @Override + public long getEventId() { + return eventId; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + ReaderWriter.writeDatum(dataOutput, dbName); + ReaderWriter.writeDatum(dataOutput, tableName); + ReaderWriter.writeDatum(dataOutput, Boolean.valueOf(isReplicatedEvent)); + ReaderWriter.writeDatum(dataOutput, Long.valueOf(eventId)); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + dbName = (String)ReaderWriter.readDatum(dataInput); + tableName = (String)ReaderWriter.readDatum(dataInput); + isReplicatedEvent = ((Boolean)ReaderWriter.readDatum(dataInput)).booleanValue(); + eventId = ((Long)ReaderWriter.readDatum(dataInput)).longValue(); + } + + @Override + void run(HCatClient client, Configuration conf) throws HCatException { + client.dropTable(dbName,tableName,true); // No support for .. FOR REPLICATION semantics form HCatClient yet. + } + + @Override + boolean isRunnableFromHCatClient() { + return !isReplicatedEvent; // No support for .. FOR REPLICATION semantics form HCatClient yet. + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/ExportCommand.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/ExportCommand.java new file mode 100644 index 0000000..4ff11c8 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/ExportCommand.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.commands; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.ReaderWriter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class ExportCommand extends HiveCommand { + private String exportLocation; + private String dbName = null; + private String tableName = null; + private Map ptnDesc = null; + private long eventId; + private boolean isMetadataOnly = false; + + public ExportCommand(String dbName, String tableName, Map ptnDesc, + String exportLocation, boolean isMetadataOnly, long eventId) { + this.dbName = dbName; + this.tableName = tableName; + this.ptnDesc = ptnDesc; + this.exportLocation = exportLocation; + this.isMetadataOnly = isMetadataOnly; + this.eventId = eventId; + } + + public ExportCommand(){ + // trivial ctor to support Writable reflections instantiation + // do not expect to use this object as-is, unless you call + // readFields after using this ctor + } + + @Override + public List get() { + // EXPORT TABLE tablename [PARTITION (part_column="value"[, ...])] + // TO 'export_target_path' + StringBuilder sb = new StringBuilder(); + sb.append("EXPORT TABLE "); + sb.append(dbName); + sb.append("."); + sb.append(tableName); // TODO: Handle quoted tablenames + sb.append(ReplicationUtils.partitionDescriptor(ptnDesc)); + sb.append(" TO '"); + sb.append(exportLocation); + sb.append("\' FOR "); + if (isMetadataOnly){ + sb.append("METADATA "); + } + sb.append("REPLICATION(\'"); + sb.append(eventId); + sb.append("\')"); + return Arrays.asList(sb.toString()); + } + + @Override + public boolean isRetriable() { + return true; // Export is trivially retriable (after clearing out the staging dir provided.) + } + + @Override + public boolean isUndoable() { + return true; // Export is trivially undoable - in that nothing needs doing to undo it. + } + + @Override + public List getUndo() { + return new ArrayList(); // Nothing to undo. + } + + @Override + public List cleanupLocationsPerRetry() { + return Arrays.asList(exportLocation); + } + + @Override + public List cleanupLocationsAfterEvent() { + return Arrays.asList(exportLocation); + } + + @Override + public long getEventId() { + return eventId; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + ReaderWriter.writeDatum(dataOutput, dbName); + ReaderWriter.writeDatum(dataOutput, tableName); + ReaderWriter.writeDatum(dataOutput, ptnDesc); + ReaderWriter.writeDatum(dataOutput, exportLocation); + ReaderWriter.writeDatum(dataOutput,Boolean.valueOf(isMetadataOnly)); + ReaderWriter.writeDatum(dataOutput,Long.valueOf(eventId)); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + dbName = (String)ReaderWriter.readDatum(dataInput); + tableName = (String)ReaderWriter.readDatum(dataInput); + ptnDesc = (Map)ReaderWriter.readDatum(dataInput); + exportLocation = (String)ReaderWriter.readDatum(dataInput); + isMetadataOnly = ((Boolean)ReaderWriter.readDatum(dataInput)).booleanValue(); + eventId = ((Long)ReaderWriter.readDatum(dataInput)).longValue(); + } + + @Override + void run(HCatClient client, Configuration conf) throws HCatException { + // TODO : Implement + throw new IllegalStateException("Not implemented yet! Test isRunnableFromHCatClient() before calling"); + } + + @Override + boolean isRunnableFromHCatClient() { + return false; // There is currently no way to run export from HCatClient. + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/HiveCommand.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/HiveCommand.java new file mode 100644 index 0000000..a1fd67e --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/HiveCommand.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.commands; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.common.HCatException; + +/** + * Dummy marker class, to indicate that all those that extend this are + * Commands whose get() method returns hive ql commands that can be + * sent to HiveDriver. + */ +public abstract class HiveCommand implements Command { + + // WARNING : This call is still under design and should be considered highly + // experimental - there are no guarantees being made with it, and if it turns + // out that this call is not supportable, it will be removed. + abstract void run(HCatClient client, Configuration conf) throws HCatException; + abstract boolean isRunnableFromHCatClient(); // returns true if run() can be run for this Command +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/ImportCommand.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/ImportCommand.java new file mode 100644 index 0000000..d9e3937 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/ImportCommand.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.commands; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.ReaderWriter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class ImportCommand extends HiveCommand { + private String importLocation; + private String dbName = null; + private String tableName = null; + private Map ptnDesc = null; + private long eventId; + private boolean isDefinitionOnly = false; + + + // NOTE: The current implementation does not allow importing to an "EXTERNAL" location. + // This is intentional, since we want the destination tables to be "managed" tables. + // If this assumption should change at some point in the future, ImportSemanticAnalyzer + // will need some of its checks changed to allow for "replacing" external tables. + + public ImportCommand(String dbName, String tableName, Map ptnDesc, + String importLocation, boolean isDefinitionOnly, long eventId) { + this.dbName = dbName; + this.tableName = tableName; + this.ptnDesc = ptnDesc; + this.importLocation = importLocation; + this.isDefinitionOnly = isDefinitionOnly; + this.eventId = eventId; + } + + public ImportCommand(){ + // trivial ctor to support Writable reflections instantiation + // do not expect to use this object as-is, unless you call + // readFields after using this ctor + } + + @Override + public List get() { + // IMPORT [[EXTERNAL] TABLE new_or_original_tablename [PARTITION (part_column="value"[, ...])]] + // FROM 'source_path' + // [LOCATION 'import_target_path'] + StringBuilder sb = new StringBuilder(); + sb.append("IMPORT TABLE "); + sb.append(dbName); + sb.append('.'); + sb.append(tableName); // TODO: Handle quoted tablenames + sb.append(ReplicationUtils.partitionDescriptor(ptnDesc)); + sb.append(" FROM '"); + sb.append(importLocation); + sb.append('\''); + return Arrays.asList(sb.toString()); + } + + @Override + public boolean isRetriable() { + return true; + // Repl imports are replace-imports, and thus, are idempotent. + // Note that this assumes that this ImportCommand is running on an export dump + // created using EXPORT ... FOR REPLICATION. If the scope of ImportCommand + // were to eventually expand to importing dumps created by regular exports, + // then this needs updating. + } + + @Override + public boolean isUndoable() { + return false; // Alters and replacements are not undoable if they've taken effect already. They are retriable though. + } + + @Override + public List getUndo() { + // throw new UnsupportedOperationException("Attempted to getUndo() on a repl import that does not support undo."); + return new ArrayList(); + // TODO: Decide if we want to throw an exception or return an empty list. + // This would depend on what we intend to be the isUndoable semantic. + } + + @Override + public List cleanupLocationsPerRetry() { + return new ArrayList(); + } + + @Override + public List cleanupLocationsAfterEvent() { + return Arrays.asList(importLocation); + } + + @Override + public long getEventId() { + return eventId; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + ReaderWriter.writeDatum(dataOutput, dbName); + ReaderWriter.writeDatum(dataOutput, tableName); + ReaderWriter.writeDatum(dataOutput, ptnDesc); + ReaderWriter.writeDatum(dataOutput, importLocation); + ReaderWriter.writeDatum(dataOutput,Boolean.valueOf(isDefinitionOnly)); + ReaderWriter.writeDatum(dataOutput,Long.valueOf(eventId)); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + dbName = (String)ReaderWriter.readDatum(dataInput); + tableName = (String)ReaderWriter.readDatum(dataInput); + ptnDesc = (Map)ReaderWriter.readDatum(dataInput); + importLocation = (String)ReaderWriter.readDatum(dataInput); + isDefinitionOnly = ((Boolean)ReaderWriter.readDatum(dataInput)).booleanValue(); + eventId = ((Long)ReaderWriter.readDatum(dataInput)).longValue(); + } + + @Override + void run(HCatClient client, Configuration conf) throws HCatException { + // TODO : Implement + throw new IllegalStateException("Not implemented yet! Test isRunnableFromHCatClient() before calling"); + } + + @Override + boolean isRunnableFromHCatClient() { + return false; // There is currently no way to run import from HCatClient. + } +} + diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AddPartitionReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AddPartitionReplicationTask.java new file mode 100644 index 0000000..e57cc66 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AddPartitionReplicationTask.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.exim; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.ExportCommand; +import org.apache.hive.hcatalog.api.repl.commands.ImportCommand; +import org.apache.hive.hcatalog.api.repl.commands.NoopCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.AddPartitionMessage; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.Map; + +public class AddPartitionReplicationTask extends ReplicationTask { + + AddPartitionMessage addPartitionMessage = null; + + public AddPartitionReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event,HCatConstants.HCAT_ADD_PARTITION_EVENT); + addPartitionMessage = messageFactory.getDeserializer().getAddPartitionMessage(event.getMessage()); + } + + public boolean needsStagingDirs(){ + // we need staging directories as long as a single partition needed addition + return (!addPartitionMessage.getPartitions().isEmpty()); + } + + + public Iterable getSrcWhCommands() { + verifyActionable(); + if (addPartitionMessage.getPartitions().isEmpty()){ + return Arrays.asList(new NoopCommand(event.getEventId())); + } + + return Iterables.transform(addPartitionMessage.getPartitions(), new Function, Command>(){ + @Override + public Command apply(@Nullable Map ptnDesc) { + return new ExportCommand( + addPartitionMessage.getDB(), + addPartitionMessage.getTable(), + ptnDesc, + srcStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + addPartitionMessage.getDB(), + addPartitionMessage.getTable(), + ptnDesc) + ), + false, + event.getEventId() + ); + } + }); + + } + + public Iterable getDstWhCommands() { + verifyActionable(); + if (addPartitionMessage.getPartitions().isEmpty()){ + return Arrays.asList(new NoopCommand(event.getEventId())); + } + + final String dstDbName = ReplicationUtils.mapIfMapAvailable(addPartitionMessage.getDB(), dbNameMapping); + final String dstTableName = ReplicationUtils.mapIfMapAvailable(addPartitionMessage.getTable(), tableNameMapping); + + return Iterables.transform(addPartitionMessage.getPartitions(), new Function, Command>() { + @Override + public Command apply(@Nullable Map ptnDesc) { + return new ImportCommand( + dstDbName, + dstTableName, + ptnDesc, + dstStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + addPartitionMessage.getDB(), // Note - important to retain the same key as the export + addPartitionMessage.getTable(), + ptnDesc) + ), + false, + event.getEventId() + ); + } + }); + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AlterPartitionReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AlterPartitionReplicationTask.java new file mode 100644 index 0000000..f1f7793 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AlterPartitionReplicationTask.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.ExportCommand; +import org.apache.hive.hcatalog.api.repl.commands.ImportCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.AlterPartitionMessage; + +import java.util.Arrays; + +public class AlterPartitionReplicationTask extends ReplicationTask { + + AlterPartitionMessage alterPartitionMessage = null; + + public AlterPartitionReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event,HCatConstants.HCAT_ALTER_PARTITION_EVENT); + alterPartitionMessage = messageFactory.getDeserializer().getAlterPartitionMessage(event.getMessage()); + } + + public boolean needsStagingDirs(){ + return true; + } + + public Iterable getSrcWhCommands() { + verifyActionable(); + + return Arrays.asList(new ExportCommand( + alterPartitionMessage.getDB(), + alterPartitionMessage.getTable(), + alterPartitionMessage.getKeyValues(), + srcStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + alterPartitionMessage.getDB(), + alterPartitionMessage.getTable(), + alterPartitionMessage.getKeyValues()) + ), + true, + event.getEventId() + )); + } + + public Iterable getDstWhCommands() { + verifyActionable(); + + final String dstDbName = ReplicationUtils.mapIfMapAvailable(alterPartitionMessage.getDB(), dbNameMapping); + final String dstTableName = ReplicationUtils.mapIfMapAvailable(alterPartitionMessage.getTable(), tableNameMapping); + + return Arrays.asList(new ImportCommand( + dstDbName, + dstTableName, + alterPartitionMessage.getKeyValues(), + dstStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + alterPartitionMessage.getDB(), // Note - important to retain the same key as the export + alterPartitionMessage.getTable(), + alterPartitionMessage.getKeyValues()) + ), + true, + event.getEventId() + )); + } +} + diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AlterTableReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AlterTableReplicationTask.java new file mode 100644 index 0000000..1e787d5 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/AlterTableReplicationTask.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.ExportCommand; +import org.apache.hive.hcatalog.api.repl.commands.ImportCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.AlterTableMessage; + +import java.util.Arrays; + +public class AlterTableReplicationTask extends ReplicationTask { + private final AlterTableMessage alterTableMessage; + + public AlterTableReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event, HCatConstants.HCAT_ALTER_TABLE_EVENT); + alterTableMessage = messageFactory.getDeserializer().getAlterTableMessage(event.getMessage()); + } + + public boolean needsStagingDirs(){ + return true; + } + + public Iterable getSrcWhCommands() { + verifyActionable(); + final String dbName = alterTableMessage.getDB(); + final String tableName = alterTableMessage.getTable(); + return Arrays.asList(new ExportCommand( + dbName, + tableName, + null, + srcStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + dbName, + tableName, + null) + ), + true, + event.getEventId() + )); + } + + public Iterable getDstWhCommands() { + verifyActionable(); + final String dbName = alterTableMessage.getDB(); + final String tableName = alterTableMessage.getTable(); + return Arrays.asList(new ImportCommand( + ReplicationUtils.mapIfMapAvailable(dbName, dbNameMapping), + ReplicationUtils.mapIfMapAvailable(tableName, tableNameMapping), + null, + dstStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + dbName, // Note - important to retain the same key as the export + tableName, + null) + ), + true, + event.getEventId() + )); + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/CreateDatabaseReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/CreateDatabaseReplicationTask.java new file mode 100644 index 0000000..8b96a4a --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/CreateDatabaseReplicationTask.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.NoopReplicationTask; +import org.apache.hive.hcatalog.common.HCatConstants; + +public class CreateDatabaseReplicationTask extends NoopReplicationTask { + + // "CREATE DATABASE" is specifically not replicated across, per design, since if a user + // drops a database and recreates another with the same one, we want to distinguish + // between the two. We will replicate the drop across, but after that, the goal is + // that if a new db is created, a new replication definition should be created in + // the replication implementer above this. Thus, we extend NoopReplicationTask and + // the only additional thing we do is validate event type. + + public CreateDatabaseReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event, HCatConstants.HCAT_CREATE_DATABASE_EVENT); + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/CreateTableReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/CreateTableReplicationTask.java new file mode 100644 index 0000000..7634348 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/CreateTableReplicationTask.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.ExportCommand; +import org.apache.hive.hcatalog.api.repl.commands.ImportCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.CreateTableMessage; + +import java.util.Arrays; + +public class CreateTableReplicationTask extends ReplicationTask { + + private CreateTableMessage createTableMessage = null; + + public CreateTableReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event, HCatConstants.HCAT_CREATE_TABLE_EVENT); + createTableMessage = messageFactory.getDeserializer().getCreateTableMessage(event.getMessage()); + } + + public boolean needsStagingDirs(){ + return true; + } + + public Iterable getSrcWhCommands() { + verifyActionable(); + final String dbName = createTableMessage.getDB(); + final String tableName = createTableMessage.getTable(); + return Arrays.asList(new ExportCommand( + dbName, + tableName, + null, + srcStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + dbName, + tableName, + null) + ), + false, + event.getEventId() + )); + } + + public Iterable getDstWhCommands() { + verifyActionable(); + final String dbName = createTableMessage.getDB(); + final String tableName = createTableMessage.getTable(); + return Arrays.asList(new ImportCommand( + ReplicationUtils.mapIfMapAvailable(dbName, dbNameMapping), + ReplicationUtils.mapIfMapAvailable(tableName, tableNameMapping), + null, + dstStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + dbName, // Note - important to retain the same key as the export + tableName, + null) + ), + false, + event.getEventId() + )); + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropDatabaseReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropDatabaseReplicationTask.java new file mode 100644 index 0000000..db831d0 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropDatabaseReplicationTask.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.DropDatabaseCommand; +import org.apache.hive.hcatalog.api.repl.commands.NoopCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.DropDatabaseMessage; + +import java.util.ArrayList; +import java.util.Arrays; + +public class DropDatabaseReplicationTask extends ReplicationTask { + private DropDatabaseMessage dropDatabaseMessage = null; + + public DropDatabaseReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event, HCatConstants.HCAT_DROP_DATABASE_EVENT); + dropDatabaseMessage = messageFactory.getDeserializer().getDropDatabaseMessage(event.getMessage()); + } + + public boolean needsStagingDirs(){ + return false; + } + + public Iterable getSrcWhCommands() { + verifyActionable(); + return Arrays.asList(new NoopCommand(event.getEventId())); + } + + public Iterable getDstWhCommands() { + verifyActionable(); + final String dstDbName = ReplicationUtils.mapIfMapAvailable(dropDatabaseMessage.getDB(), dbNameMapping); + return Arrays.asList(new DropDatabaseCommand(dstDbName, event.getEventId())); + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropPartitionReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropPartitionReplicationTask.java new file mode 100644 index 0000000..1fdc0f4 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropPartitionReplicationTask.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.exim; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.DropPartitionCommand; +import org.apache.hive.hcatalog.api.repl.commands.NoopCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.DropPartitionMessage; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; + +public class DropPartitionReplicationTask extends ReplicationTask { + + DropPartitionMessage dropPartitionMessage = null; + + public DropPartitionReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event, HCatConstants.HCAT_DROP_PARTITION_EVENT); + dropPartitionMessage = messageFactory.getDeserializer().getDropPartitionMessage(event.getMessage()); + } + + public boolean needsStagingDirs(){ + return false; + } + + + public Iterable getSrcWhCommands() { + verifyActionable(); + return Arrays.asList(new NoopCommand(event.getEventId())); + } + + public Iterable getDstWhCommands() { + verifyActionable(); + + final String dstDbName = ReplicationUtils.mapIfMapAvailable(dropPartitionMessage.getDB(), dbNameMapping); + final String dstTableName = ReplicationUtils.mapIfMapAvailable(dropPartitionMessage.getTable(), tableNameMapping); + + return Iterables.transform(dropPartitionMessage.getPartitions(), new Function, Command>() { + @Override + public Command apply(@Nullable Map ptnDesc) { + return new DropPartitionCommand( + dstDbName, + dstTableName, + ptnDesc, + true, + event.getEventId() + ); + } + }); + } +} + diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropTableReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropTableReplicationTask.java new file mode 100644 index 0000000..1bc689f --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/DropTableReplicationTask.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.DropTableCommand; +import org.apache.hive.hcatalog.api.repl.commands.NoopCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.DropTableMessage; + +import java.util.ArrayList; +import java.util.Arrays; + +public class DropTableReplicationTask extends ReplicationTask { + private DropTableMessage dropTableMessage = null; + + public DropTableReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event, HCatConstants.HCAT_DROP_TABLE_EVENT); + dropTableMessage = messageFactory.getDeserializer().getDropTableMessage(event.getMessage()); + } + + public boolean needsStagingDirs(){ + return false; + } + + public Iterable getSrcWhCommands() { + verifyActionable(); + return Arrays.asList(new NoopCommand(event.getEventId())); + } + + public Iterable getDstWhCommands() { + verifyActionable(); + final String dstDbName = ReplicationUtils.mapIfMapAvailable(dropTableMessage.getDB(), dbNameMapping); + final String dstTableName = ReplicationUtils.mapIfMapAvailable(dropTableMessage.getTable(), tableNameMapping); + return Arrays.asList(new DropTableCommand(dstDbName, dstTableName, true, event.getEventId())); + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/EximReplicationTaskFactory.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/EximReplicationTaskFactory.java new file mode 100644 index 0000000..8063aa3 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/EximReplicationTaskFactory.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.NoopReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.common.HCatConstants; + +/** + * EXIMReplicationTaskFactory is an export-import based ReplicationTask.Factory. + * + * It's primary mode of enabling replication is by translating each event it gets + * from the notification subsystem into hive commands that essentially export data + * to be copied over and imported on the other end. + * + * The Commands that Tasks return here are expected to be hive commands. + */ +public class EximReplicationTaskFactory implements ReplicationTask.Factory { + public ReplicationTask create(HCatNotificationEvent event){ + // TODO : Java 1.7+ support using String with switches, but IDEs don't all seem to know that. + // If casing is fine for now. But we should eventually remove this. Also, I didn't want to + // create another enum just for this. + if (event.getEventType().equals(HCatConstants.HCAT_CREATE_DATABASE_EVENT)) { + return new CreateDatabaseReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) { + return new DropDatabaseReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_CREATE_TABLE_EVENT)) { + return new CreateTableReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) { + return new DropTableReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) { + return new AddPartitionReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { + return new DropPartitionReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_ALTER_TABLE_EVENT)) { + return new AlterTableReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_ALTER_PARTITION_EVENT)) { + return new AlterPartitionReplicationTask(event); + } else if (event.getEventType().equals(HCatConstants.HCAT_INSERT_EVENT)) { + return new InsertReplicationTask(event); + } else { + throw new IllegalStateException("Unrecognized Event type, no replication task available"); + } + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/InsertReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/InsertReplicationTask.java new file mode 100644 index 0000000..cf93e78 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/exim/InsertReplicationTask.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.exim; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.commands.ExportCommand; +import org.apache.hive.hcatalog.api.repl.commands.ImportCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.InsertMessage; + +import java.util.Arrays; +import java.util.Map; + +public class InsertReplicationTask extends ReplicationTask { + private final InsertMessage insertMessage; + + public InsertReplicationTask(HCatNotificationEvent event) { + super(event); + validateEventType(event, HCatConstants.HCAT_INSERT_EVENT); + insertMessage = messageFactory.getDeserializer().getInsertMessage(event.getMessage()); + } + + + public boolean needsStagingDirs(){ + // we need staging directories as long as a single partition needed addition + return true; + } + + @Override + public Iterable getSrcWhCommands() { + verifyActionable(); + + final String dbName = insertMessage.getDB(); + final String tableName = insertMessage.getTable(); + final Map ptnDesc = insertMessage.getPartitionKeyValues(); + // Note : ptnDesc can be null or empty for non-ptn table + + return Arrays.asList(new ExportCommand( + dbName, + tableName, + ptnDesc, + srcStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + dbName, + tableName, + ptnDesc) + ), + false, + event.getEventId() + )); + + } + + public Iterable getDstWhCommands() { + verifyActionable(); + + final String dbName = insertMessage.getDB(); + final String tableName = insertMessage.getTable(); + final Map ptnDesc = insertMessage.getPartitionKeyValues(); + // Note : ptnDesc can be null or empty for non-ptn table + + return Arrays.asList(new ImportCommand( + ReplicationUtils.mapIfMapAvailable(dbName, dbNameMapping), + ReplicationUtils.mapIfMapAvailable(tableName, tableNameMapping), + ptnDesc, + dstStagingDirProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + getEvent().getEventId(), + dbName, // Note - important to retain the same key as the export + tableName, + ptnDesc) + ), + false, + event.getEventId() + )); + + } + +} diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java index eb21a0f..e4f4736 100644 --- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java @@ -87,7 +87,7 @@ private static boolean useExternalMS = false; private static boolean useExternalMSForReplication = false; - private static class RunMS implements Runnable { + public static class RunMS implements Runnable { private final String msPort; private List args = new ArrayList(); @@ -156,6 +156,11 @@ public static void startMetaStoreServer() throws Exception { System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); } + + public static HiveConf getConf(){ + return hcatConf; + } + public static String fixPath(String path) { if(!Shell.WINDOWS) { return path; @@ -166,6 +171,7 @@ public static String fixPath(String path) { } return expectedDir; } + @Test public void testBasicDDLCommands() throws Exception { String db = "testdb"; diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/CommandTestUtils.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/CommandTestUtils.java new file mode 100644 index 0000000..5087069 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/CommandTestUtils.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; + +import static junit.framework.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * Class which provides several useful methods to test commands, but is itself not a test. + */ +public class CommandTestUtils { + + private static Log LOG = LogFactory.getLog(CommandTestUtils.class.getName()); + + + public static void compareCommands(Command expected, Command actual, boolean ignoreSortOrder) { + // The reason we use compare-command, rather than simply getting the serialized output and comparing + // for partition-based commands is that the partition specification order can be different in different + // serializations, but still be effectively the same. (a="42",b="abc") should be the same as (b="abc",a="42") + assertEquals(expected.getClass(),actual.getClass()); + assertEquals(expected.getEventId(),actual.getEventId()); + assertEquals(expected.isUndoable(),actual.isUndoable()); + assertEquals(expected.isRetriable(),actual.isRetriable()); + + assertEquals(expected.get().size(),actual.get().size()); + Iterator actualIter = actual.get().iterator(); + for (String s : expected.get()){ + if (ignoreSortOrder){ + // compare sorted strings, rather than comparing exact strings. + assertSortedEquals(s, actualIter.next()); + } else { + assertEquals(s,actualIter.next()); + } + } + + if (expected.isUndoable()){ + Iterator actualUndoIter = actual.getUndo().iterator(); + for (String s: expected.getUndo()){ + if (ignoreSortOrder){ + assertSortedEquals(s,actualUndoIter.next()); + } else { + assertEquals(s,actualIter.next()); + } + } + } + } + + private static void assertSortedEquals(String expected, String actual) { + char[] expectedChars = expected.toCharArray(); + Arrays.sort(expectedChars); + char[] actualChars = actual.toCharArray(); + Arrays.sort(actualChars); + assertEquals(String.valueOf(expectedChars), String.valueOf(actualChars)); + } + + public static void testCommandSerialization(Command cmd) { + String serializedCmd = null; + try { + serializedCmd = ReplicationUtils.serializeCommand(cmd); + } catch (IOException e) { + LOG.error("Serialization error",e); + assertNull(e); // error out. + } + + Command cmd2 = null; + try { + cmd2 = ReplicationUtils.deserializeCommand(serializedCmd); + } catch (IOException e) { + LOG.error("Serialization error",e); + assertNull(e); // error out. + } + + assertEquals(cmd.getClass(),cmd2.getClass()); + assertEquals(cmd.getEventId(), cmd2.getEventId()); + assertEquals(cmd.get(), cmd2.get()); + assertEquals(cmd.isUndoable(),cmd2.isUndoable()); + if (cmd.isUndoable()){ + assertEquals(cmd.getUndo(),cmd2.getUndo()); + } + assertEquals(cmd.isRetriable(),cmd2.isRetriable()); + } + +} diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/TestReplicationTask.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/TestReplicationTask.java new file mode 100644 index 0000000..17c68f9 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/TestReplicationTask.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl; + +import junit.framework.TestCase; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.MessageFactory; +import org.junit.Test; + +public class TestReplicationTask extends TestCase{ + private static MessageFactory msgFactory = MessageFactory.getInstance(); + + + @Test + public static void testCreate(){ + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + NotificationEvent event = new NotificationEvent(0, (int)System.currentTimeMillis(), + HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + ReplicationTask.resetFactory(null); + ReplicationTask rtask = ReplicationTask.create(new HCatNotificationEvent(event)); + + assertTrue("Default factory instantiation should yield NoopReplicationTask", rtask instanceof NoopReplicationTask); + } + +} diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestCommands.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestCommands.java new file mode 100644 index 0000000..05fd89f --- /dev/null +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestCommands.java @@ -0,0 +1,585 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.commands; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.hcatalog.HcatTestUtils; +import org.apache.hive.hcatalog.api.HCatAddPartitionDesc; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.api.HCatCreateDBDesc; +import org.apache.hive.hcatalog.api.HCatCreateTableDesc; +import org.apache.hive.hcatalog.api.HCatDatabase; +import org.apache.hive.hcatalog.api.HCatPartition; +import org.apache.hive.hcatalog.api.HCatTable; +import org.apache.hive.hcatalog.api.ObjectNotFoundException; +import org.apache.hive.hcatalog.api.TestHCatClient; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.CommandTestUtils; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestCommands { + + private static Log LOG = LogFactory.getLog(CommandTestUtils.class.getName()); + + private static HiveConf hconf; + private static Driver driver; + private static HCatClient client; + private static String TEST_PATH; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + + TestHCatClient.startMetaStoreServer(); + hconf = TestHCatClient.getConf(); + hconf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,""); + + TEST_PATH = System.getProperty("test.warehouse.dir","/tmp") + Path.SEPARATOR + + TestCommands.class.getCanonicalName() + "-" + System.currentTimeMillis(); + Path testPath = new Path(TEST_PATH); + FileSystem fs = FileSystem.get(testPath.toUri(),hconf); + fs.mkdirs(testPath); + + driver = new Driver(hconf); + SessionState.start(new CliSessionState(hconf)); + client = HCatClient.create(hconf); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TestHCatClient.tearDown(); + } + + @Test + public void testDropDatabaseCommand() throws HCatException, CommandNeedRetryException { + String dbName = "cmd_testdb"; + int evid = 999; + Command testCmd = new DropDatabaseCommand(dbName, evid); + + assertEquals(evid,testCmd.getEventId()); + assertEquals(1, testCmd.get().size()); + assertEquals(true,testCmd.isRetriable()); + assertEquals(false,testCmd.isUndoable()); + + CommandTestUtils.testCommandSerialization(testCmd); + + client.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + client.createDatabase(HCatCreateDBDesc.create(dbName).ifNotExists(false).build()); + HCatDatabase db = client.getDatabase(dbName); + assertNotNull(db); + + LOG.info("About to run :"+testCmd.get().get(0)); + driver.run(testCmd.get().get(0)); + + Exception onfe = null; + try { + HCatDatabase db_del = client.getDatabase(dbName); + } catch (Exception e) { + onfe = e; + } + + assertNotNull(onfe); + assertTrue(onfe instanceof ObjectNotFoundException); + } + + @Test + public void testDropTableCommand() throws HCatException, CommandNeedRetryException { + String dbName = "cmd_testdb"; + String tableName = "cmd_testtable"; + int evid = 789; + List cols = HCatSchemaUtils.getHCatSchema("a:int,b:string").getFields(); + + Command testReplicatedDropCmd = new DropTableCommand(dbName,tableName,true,evid); + + assertEquals(evid,testReplicatedDropCmd.getEventId()); + assertEquals(1, testReplicatedDropCmd.get().size()); + assertEquals(true, testReplicatedDropCmd.isRetriable()); + assertEquals(false, testReplicatedDropCmd.isUndoable()); + + CommandTestUtils.testCommandSerialization(testReplicatedDropCmd); + + Command testNormalDropCmd = new DropTableCommand(dbName,tableName,false,evid); + + assertEquals(evid,testNormalDropCmd.getEventId()); + assertEquals(1, testNormalDropCmd.get().size()); + assertEquals(true,testNormalDropCmd.isRetriable()); + assertEquals(false,testNormalDropCmd.isUndoable()); + + CommandTestUtils.testCommandSerialization(testNormalDropCmd); + + client.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + client.createDatabase(HCatCreateDBDesc.create(dbName).ifNotExists(false).build()); + + Map tprops = new HashMap(); + tprops.put(ReplicationUtils.REPL_STATE_ID,String.valueOf(evid + 5)); + HCatTable tableToCreate = (new HCatTable(dbName, tableName)).tblProps(tprops).cols(cols); + + client.createTable(HCatCreateTableDesc.create(tableToCreate).build()); + HCatTable t1 = client.getTable(dbName, tableName); + assertNotNull(t1); + + // Test replicated drop, should not drop, because evid < repl.state.id + LOG.info("About to run :"+testReplicatedDropCmd.get().get(0)); + driver.run(testReplicatedDropCmd.get().get(0)); + HCatTable t2 = client.getTable(dbName,tableName); + assertNotNull(t2); + + // Test normal drop, should drop unconditionally. + LOG.info("About to run :"+testNormalDropCmd.get().get(0)); + driver.run(testNormalDropCmd.get().get(0)); + + Exception onfe = null; + try { + HCatTable t_del = client.getTable(dbName, tableName); + } catch (Exception e) { + onfe = e; + } + + assertNotNull(onfe); + assertTrue(onfe instanceof ObjectNotFoundException); + + Map tprops2 = new HashMap(); + tprops2.put(ReplicationUtils.REPL_STATE_ID,String.valueOf(evid - 5)); + HCatTable tableToCreate2 = (new HCatTable(dbName, tableName)).tblProps(tprops2).cols(cols); + + client.createTable(HCatCreateTableDesc.create(tableToCreate2).build()); + HCatTable t3 = client.getTable(dbName, tableName); + assertNotNull(t3); + + // Test replicated drop, should drop this time, since repl.state.id < evid. + LOG.info("About to run :"+testReplicatedDropCmd.get().get(0)); + driver.run(testReplicatedDropCmd.get().get(0)); + + Exception onfe2 = null; + try { + HCatTable t_del = client.getTable(dbName, tableName); + } catch (Exception e) { + onfe2 = e; + } + + assertNotNull(onfe2); + assertTrue(onfe2 instanceof ObjectNotFoundException); + + } + + @Test + public void testDropPartitionCommand() throws HCatException, CommandNeedRetryException { + String dbName = "cmd_testdb"; + String tableName = "cmd_testtable"; + int evid = 789; + + List pcols = HCatSchemaUtils.getHCatSchema("b:string").getFields(); + List cols = HCatSchemaUtils.getHCatSchema("a:int").getFields(); + Map ptnDesc = new HashMap(); + ptnDesc.put("b","test"); + + Command testReplicatedDropPtnCmd = new DropPartitionCommand(dbName, tableName, ptnDesc, true, evid); + + assertEquals(evid,testReplicatedDropPtnCmd.getEventId()); + assertEquals(1, testReplicatedDropPtnCmd.get().size()); + assertEquals(true, testReplicatedDropPtnCmd.isRetriable()); + assertEquals(false, testReplicatedDropPtnCmd.isUndoable()); + + CommandTestUtils.testCommandSerialization(testReplicatedDropPtnCmd); + + Command testNormalDropPtnCmd = new DropPartitionCommand(dbName,tableName, ptnDesc, false, evid); + + assertEquals(evid,testNormalDropPtnCmd.getEventId()); + assertEquals(1, testNormalDropPtnCmd.get().size()); + assertEquals(true,testNormalDropPtnCmd.isRetriable()); + assertEquals(false,testNormalDropPtnCmd.isUndoable()); + + CommandTestUtils.testCommandSerialization(testNormalDropPtnCmd); + + client.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + client.createDatabase(HCatCreateDBDesc.create(dbName).ifNotExists(false).build()); + + Map props = new HashMap(); + props.put(ReplicationUtils.REPL_STATE_ID,String.valueOf(evid + 5)); + HCatTable table = (new HCatTable(dbName, tableName)).tblProps(props).cols(cols).partCols(pcols); + + client.createTable(HCatCreateTableDesc.create(table).build()); + HCatTable t = client.getTable(dbName, tableName); + assertNotNull(t); + + HCatPartition ptnToAdd = (new HCatPartition(table, ptnDesc, "")).parameters(props); + client.addPartition(HCatAddPartitionDesc.create(ptnToAdd).build()); + + HCatPartition p1 = client.getPartition(dbName,tableName,ptnDesc); + assertNotNull(p1); + + // Test replicated drop, should not drop, because evid < repl.state.id + LOG.info("About to run :"+testReplicatedDropPtnCmd.get().get(0)); + driver.run(testReplicatedDropPtnCmd.get().get(0)); + HCatPartition p2 = client.getPartition(dbName,tableName,ptnDesc); + assertNotNull(p2); + + // Test normal drop, should drop unconditionally. + LOG.info("About to run :"+testNormalDropPtnCmd.get().get(0)); + driver.run(testNormalDropPtnCmd.get().get(0)); + + Exception onfe = null; + try { + HCatPartition p_del = client.getPartition(dbName,tableName,ptnDesc); + } catch (Exception e) { + onfe = e; + } + + assertNotNull(onfe); + assertTrue(onfe instanceof ObjectNotFoundException); + + Map props2 = new HashMap(); + props2.put(ReplicationUtils.REPL_STATE_ID,String.valueOf(evid - 5)); + + HCatPartition ptnToAdd2 = (new HCatPartition(table, ptnDesc, "")).parameters(props2); + client.addPartition(HCatAddPartitionDesc.create(ptnToAdd2).build()); + + HCatPartition p3 = client.getPartition(dbName,tableName,ptnDesc); + assertNotNull(p3); + + // Test replicated drop, should drop this time, since repl.state.id < evid. + LOG.info("About to run :"+testReplicatedDropPtnCmd.get().get(0)); + driver.run(testReplicatedDropPtnCmd.get().get(0)); + + Exception onfe2 = null; + try { + HCatPartition p_del = client.getPartition(dbName,tableName,ptnDesc); + } catch (Exception e) { + onfe2 = e; + } + + assertNotNull(onfe2); + assertTrue(onfe2 instanceof ObjectNotFoundException); + } + + @Test + public void testDropTableCommand2() throws HCatException, CommandNeedRetryException { + // Secondary DropTableCommand test for testing repl-drop-tables' effect on partitions inside a partitioned table + // when there exist partitions inside the table which are older than the drop event. + // Our goal is this : Create a table t, with repl.last.id=157, say. + // Create 2 partitions inside it, with repl.last.id=150 and 160, say. + // Now, process a drop table command with eventid=155. + // It should result in the table and the partition with repl.last.id=160 continuing to exist, + // but dropping the partition with repl.last.id=150. + + String dbName = "cmd_testdb"; + String tableName = "cmd_testtable"; + int evid = 157; + + List pcols = HCatSchemaUtils.getHCatSchema("b:string").getFields(); + List cols = HCatSchemaUtils.getHCatSchema("a:int").getFields(); + + Command testReplicatedDropCmd = new DropTableCommand(dbName,tableName,true,evid); + + client.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + client.createDatabase(HCatCreateDBDesc.create(dbName).ifNotExists(false).build()); + + Map tprops = new HashMap(); + tprops.put(ReplicationUtils.REPL_STATE_ID,String.valueOf(evid + 2)); + HCatTable table = (new HCatTable(dbName, tableName)).tblProps(tprops).cols(cols).partCols(pcols); + + client.createTable(HCatCreateTableDesc.create(table).build()); + HCatTable t = client.getTable(dbName, tableName); + assertNotNull(t); + + Map ptnDesc1 = new HashMap(); + ptnDesc1.put("b","test-older"); + Map props1 = new HashMap(); + props1.put(ReplicationUtils.REPL_STATE_ID,String.valueOf(evid - 5)); + HCatPartition ptnToAdd1 = (new HCatPartition(table, ptnDesc1, "")).parameters(props1); + client.addPartition(HCatAddPartitionDesc.create(ptnToAdd1).build()); + + Map ptnDesc2 = new HashMap(); + ptnDesc2.put("b","test-newer"); + Map props2 = new HashMap(); + props2.put(ReplicationUtils.REPL_STATE_ID, String.valueOf(evid + 5)); + HCatPartition ptnToAdd2 = (new HCatPartition(table, ptnDesc2, "")).parameters(props2); + client.addPartition(HCatAddPartitionDesc.create(ptnToAdd2).build()); + + HCatPartition p1 = client.getPartition(dbName,tableName,ptnDesc1); + assertNotNull(p1); + HCatPartition p2 = client.getPartition(dbName,tableName,ptnDesc2); + assertNotNull(p2); + + LOG.info("About to run :"+testReplicatedDropCmd.get().get(0)); + driver.run(testReplicatedDropCmd.get().get(0)); + + HCatTable t_stillExists = client.getTable(dbName,tableName); + assertNotNull(t_stillExists); + + HCatPartition p2_stillExists = client.getPartition(dbName,tableName,ptnDesc2); + + Exception onfe = null; + try { + HCatPartition p1_del = client.getPartition(dbName,tableName,ptnDesc1); + } catch (Exception e) { + onfe = e; + } + + assertNotNull(onfe); + assertTrue(onfe instanceof ObjectNotFoundException); + } + + + @Test + public void testBasicReplEximCommands() throws IOException, CommandNeedRetryException { + // repl export, has repl.last.id and repl.scope=all in it + // import repl dump, table has repl.last.id on it (will likely be 0) + int evid = 111; + String exportLocation = TEST_PATH + File.separator + "testBasicReplExim"; + Path tempPath = new Path(TEST_PATH ,"testBasicReplEximTmp"); + String tempLocation = tempPath.toUri().getPath(); + + String dbName = "exim"; + String tableName = "basicSrc"; + String importedTableName = "basicDst"; + List cols = HCatSchemaUtils.getHCatSchema("b:string").getFields(); + + client.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + client.createDatabase(HCatCreateDBDesc.create(dbName).ifNotExists(false).build()); + + HCatTable table = (new HCatTable(dbName, tableName)).cols(cols).fileFormat("textfile"); + + client.createTable(HCatCreateTableDesc.create(table).build()); + HCatTable t = client.getTable(dbName, tableName); + assertNotNull(t); + + String[] data = new String[]{ "eleven" , "twelve" }; + + HcatTestUtils.createTestDataFile(tempLocation,data); + + CommandProcessorResponse ret = driver.run( + "LOAD DATA LOCAL INPATH '"+tempLocation+"' OVERWRITE INTO TABLE "+ dbName+ "." + tableName + ); + assertEquals(ret.getResponseCode() + ":" + ret.getErrorMessage(), null, ret.getException()); + + CommandProcessorResponse selectRet = driver.run("SELECT * from " + dbName + "." + tableName); + assertEquals(selectRet.getResponseCode() + ":" + selectRet.getErrorMessage(), + null, selectRet.getException()); + + List values = new ArrayList(); + driver.getResults(values); + + assertEquals(2, values.size()); + assertEquals(data[0],values.get(0)); + assertEquals(data[1],values.get(1)); + + ExportCommand exportCmd = new ExportCommand(dbName,tableName,null, + exportLocation, false, evid); + + LOG.info("About to run :" + exportCmd.get().get(0)); + CommandProcessorResponse ret2 = driver.run(exportCmd.get().get(0)); + assertEquals(ret2.getResponseCode() + ":" + ret2.getErrorMessage(), null, ret2.getException()); + + List exportPaths = exportCmd.cleanupLocationsAfterEvent(); + assertEquals(1,exportPaths.size()); + String metadata = getMetadataContents(exportPaths.get(0)); + LOG.info("Export returned the following _metadata contents:"); + LOG.info(metadata); + assertTrue(metadata + "did not match \"repl.scope\"=\"all\"", metadata.matches(".*\"repl.scope\":\"all\".*")); + assertTrue(metadata + "has \"repl.last.id\"",metadata.matches(".*\"repl.last.id\":.*")); + + ImportCommand importCmd = new ImportCommand(dbName, importedTableName, null, exportLocation, false, evid); + + LOG.info("About to run :" + importCmd.get().get(0)); + CommandProcessorResponse ret3 = driver.run(importCmd.get().get(0)); + assertEquals(ret3.getResponseCode() + ":" + ret3.getErrorMessage(), null, ret3.getException()); + + CommandProcessorResponse selectRet2 = driver.run("SELECT * from " + dbName + "." + importedTableName); + assertEquals(selectRet2.getResponseCode() + ":" + selectRet2.getErrorMessage(), + null, selectRet2.getException()); + + List values2 = new ArrayList(); + driver.getResults(values2); + + assertEquals(2, values2.size()); + assertEquals(data[0],values2.get(0)); + assertEquals(data[1],values2.get(1)); + + HCatTable importedTable = client.getTable(dbName,importedTableName); + assertNotNull(importedTable); + + assertTrue(importedTable.getTblProps().containsKey("repl.last.id")); + } + + @Test + public void testMetadataReplEximCommands() throws IOException, CommandNeedRetryException { + // repl metadata export, has repl.last.id and repl.scope=metadata + // import repl metadata dump, table metadata changed, allows override, has repl.last.id + int evid = 222; + String exportLocation = TEST_PATH + File.separator + "testMetadataReplExim"; + Path tempPath = new Path(TEST_PATH ,"testMetadataReplEximTmp"); + String tempLocation = tempPath.toUri().getPath(); + + String dbName = "exim"; + String tableName = "basicSrc"; + String importedTableName = "basicDst"; + List cols = HCatSchemaUtils.getHCatSchema("b:string").getFields(); + + client.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + client.createDatabase(HCatCreateDBDesc.create(dbName).ifNotExists(false).build()); + + HCatTable table = (new HCatTable(dbName, tableName)).cols(cols).fileFormat("textfile"); + + client.createTable(HCatCreateTableDesc.create(table).build()); + HCatTable t = client.getTable(dbName, tableName); + assertNotNull(t); + + String[] data = new String[]{ "eleven" , "twelve" }; + + HcatTestUtils.createTestDataFile(tempLocation,data); + + CommandProcessorResponse ret = driver.run( + "LOAD DATA LOCAL INPATH '"+tempLocation+"' OVERWRITE INTO TABLE "+ dbName+ "." + tableName + ); + assertEquals(ret.getResponseCode() + ":" + ret.getErrorMessage(), null, ret.getException()); + + CommandProcessorResponse selectRet = driver.run("SELECT * from " + dbName + "." + tableName); + assertEquals(selectRet.getResponseCode() + ":" + selectRet.getErrorMessage(), + null, selectRet.getException()); + + List values = new ArrayList(); + driver.getResults(values); + + assertEquals(2, values.size()); + assertEquals(data[0],values.get(0)); + assertEquals(data[1],values.get(1)); + + ExportCommand exportMdCmd = new ExportCommand(dbName,tableName,null, + exportLocation, true, evid); + + LOG.info("About to run :" + exportMdCmd.get().get(0)); + CommandProcessorResponse ret2 = driver.run(exportMdCmd.get().get(0)); + assertEquals(ret2.getResponseCode() + ":" + ret2.getErrorMessage(), null, ret2.getException()); + + List exportPaths = exportMdCmd.cleanupLocationsAfterEvent(); + assertEquals(1,exportPaths.size()); + String metadata = getMetadataContents(exportPaths.get(0)); + LOG.info("Export returned the following _metadata contents:"); + LOG.info(metadata); + assertTrue(metadata + "did not match \"repl.scope\"=\"metadata\"",metadata.matches(".*\"repl.scope\":\"metadata\".*")); + assertTrue(metadata + "has \"repl.last.id\"",metadata.matches(".*\"repl.last.id\":.*")); + + ImportCommand importMdCmd = new ImportCommand(dbName, importedTableName, null, exportLocation, true, evid); + + LOG.info("About to run :" + importMdCmd.get().get(0)); + CommandProcessorResponse ret3 = driver.run(importMdCmd.get().get(0)); + assertEquals(ret3.getResponseCode() + ":" + ret3.getErrorMessage(), null, ret3.getException()); + + CommandProcessorResponse selectRet2 = driver.run("SELECT * from " + dbName + "." + importedTableName); + assertEquals(selectRet2.getResponseCode() + ":" + selectRet2.getErrorMessage(), + null, selectRet2.getException()); + + List values2 = new ArrayList(); + driver.getResults(values2); + + assertEquals(0, values2.size()); + + HCatTable importedTable = client.getTable(dbName,importedTableName); + assertNotNull(importedTable); + + assertTrue(importedTable.getTblProps().containsKey("repl.last.id")); + } + + + @Test + public void testNoopReplEximCommands() throws CommandNeedRetryException, IOException { + // repl noop export on non-existant table, has repl.noop, does not error + // import repl noop dump, no error + + int evid = 333; + String exportLocation = TEST_PATH + File.separator + "testNoopReplExim"; + String dbName = "doesNotExist" + System.currentTimeMillis(); + String tableName = "nope" + System.currentTimeMillis(); + + ExportCommand noopExportCmd = new ExportCommand(dbName,tableName,null, + exportLocation, false, evid); + + LOG.info("About to run :" + noopExportCmd.get().get(0)); + CommandProcessorResponse ret = driver.run(noopExportCmd.get().get(0)); + assertEquals(ret.getResponseCode() + ":" + ret.getErrorMessage(), null, ret.getException()); + + List exportPaths = noopExportCmd.cleanupLocationsAfterEvent(); + assertEquals(1,exportPaths.size()); + String metadata = getMetadataContents(exportPaths.get(0)); + LOG.info("Export returned the following _metadata contents:"); + LOG.info(metadata); + assertTrue(metadata + "did not match \"repl.noop\"=\"true\"",metadata.matches(".*\"repl.noop\":\"true\".*")); + + ImportCommand noopImportCmd = new ImportCommand(dbName, tableName, null, exportLocation, false, evid); + + LOG.info("About to run :" + noopImportCmd.get().get(0)); + CommandProcessorResponse ret2 = driver.run(noopImportCmd.get().get(0)); + assertEquals(ret2.getResponseCode() + ":" + ret2.getErrorMessage(), null, ret2.getException()); + + Exception onfe = null; + try { + HCatDatabase d_doesNotExist = client.getDatabase(dbName); + } catch (Exception e) { + onfe = e; + } + + assertNotNull(onfe); + assertTrue(onfe instanceof ObjectNotFoundException); + } + + private static String getMetadataContents(String exportPath) throws IOException { + Path mdFilePath = new Path(exportPath,"_metadata"); + + FileSystem fs = FileSystem.get(mdFilePath.toUri(), hconf); + assertTrue(mdFilePath.toUri().toString() + "does not exist",fs.exists(mdFilePath)); + + BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(mdFilePath))); + StringBuilder sb = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + sb.append(line); + } + reader.close(); + return sb.toString(); + } + +} diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestNoopCommand.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestNoopCommand.java new file mode 100644 index 0000000..e8fefbc --- /dev/null +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/commands/TestNoopCommand.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl.commands; + +import junit.framework.TestCase; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.CommandTestUtils; +import org.junit.Test; + +public class TestNoopCommand extends TestCase { + + @Test + public static void testCommand(){ + int evid = 999; + Command testCmd = new NoopCommand(evid); + + assertEquals(evid,testCmd.getEventId()); + assertEquals(0, testCmd.get().size()); + assertEquals(true,testCmd.isRetriable()); + assertEquals(true,testCmd.isUndoable()); + assertEquals(0, testCmd.getUndo().size()); + + CommandTestUtils.testCommandSerialization(testCmd); + } + +} diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/exim/TestEximReplicationTasks.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/exim/TestEximReplicationTasks.java new file mode 100644 index 0000000..183a3ae --- /dev/null +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/exim/TestEximReplicationTasks.java @@ -0,0 +1,591 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.exim; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import junit.framework.TestCase; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.CommandTestUtils; +import org.apache.hive.hcatalog.api.repl.NoopReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider; +import org.apache.hive.hcatalog.api.repl.commands.DropDatabaseCommand; +import org.apache.hive.hcatalog.api.repl.commands.DropPartitionCommand; +import org.apache.hive.hcatalog.api.repl.commands.DropTableCommand; +import org.apache.hive.hcatalog.api.repl.commands.ExportCommand; +import org.apache.hive.hcatalog.api.repl.commands.ImportCommand; +import org.apache.hive.hcatalog.api.repl.commands.NoopCommand; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hive.hcatalog.messaging.MessageFactory; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class TestEximReplicationTasks extends TestCase { + + private static MessageFactory msgFactory = MessageFactory.getInstance(); + private static StagingDirectoryProvider stagingDirectoryProvider = + new StagingDirectoryProvider.TrivialImpl("/tmp","/"); + + @Before + public void setUp(){ + ReplicationTask.resetFactory(EximReplicationTaskFactory.class); + } + + // Dummy mapping used for all db and table name mappings + static Function debugMapping = new Function(){ + @Nullable + @Override + public String apply(@Nullable String s) { + if (s == null){ + return null; + } else { + StringBuilder sb = new StringBuilder(s); + return sb.toString() + sb.reverse().toString(); + } + } + }; + + @Test + public static void testDebugMapper(){ + assertEquals("BlahhalB",debugMapping.apply("Blah")); + assertEquals(null, debugMapping.apply(null)); + assertEquals("", debugMapping.apply("")); + } + + @Test + public static void testCreateDb(){ + Database db = new Database(); + db.setName("testdb"); + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_CREATE_DATABASE_EVENT, msgFactory.buildCreateDatabaseMessage(db).toString()); + event.setDbName(db.getName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyCreateDbReplicationTask(rtask); // CREATE DB currently replicated as Noop. + } + + private static void verifyCreateDbReplicationTask(ReplicationTask rtask) { + assertEquals(CreateDatabaseReplicationTask.class, rtask.getClass()); + assertTrue("CreateDatabaseReplicationTask should be a noop", rtask instanceof NoopReplicationTask); + assertEquals(false, rtask.needsStagingDirs()); + assertEquals(true, rtask.isActionable()); + for (Command c : rtask.getSrcWhCommands()){ + assertEquals(NoopCommand.class,c.getClass()); + } + for (Command c : rtask.getDstWhCommands()){ + assertEquals(NoopCommand.class,c.getClass()); + } + } + + @Test + public static void testDropDb() throws IOException { + Database db = new Database(); + db.setName("testdb"); + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_DROP_DATABASE_EVENT, msgFactory.buildCreateDatabaseMessage(db).toString()); + event.setDbName(db.getName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyDropDbReplicationTask(rtask); + + } + + private static void verifyDropDbReplicationTask(ReplicationTask rtask) throws IOException { + assertEquals(DropDatabaseReplicationTask.class, rtask.getClass()); + assertEquals(false, rtask.needsStagingDirs()); + assertEquals(true, rtask.isActionable()); + + rtask + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + for (Command c : rtask.getSrcWhCommands()){ + assertEquals(NoopCommand.class, c.getClass()); + } + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(1,dstCommands.size()); + assertEquals(DropDatabaseCommand.class, dstCommands.get(0).getClass()); + + DropDatabaseCommand dropDatabaseCommand = new DropDatabaseCommand( + debugMapping.apply(rtask.getEvent().getDbName()), + rtask.getEvent().getEventId()); + + assertEquals(ReplicationUtils.serializeCommand(dropDatabaseCommand), + ReplicationUtils.serializeCommand(dstCommands.get(0))); + } + + @Test + public static void testCreateTable() throws IOException { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyCreateTableReplicationTask(rtask); + } + + private static void verifyCreateTableReplicationTask(ReplicationTask rtask) throws IOException { + assertEquals(CreateTableReplicationTask.class, rtask.getClass()); + assertEquals(true, rtask.needsStagingDirs()); + assertEquals(false, rtask.isActionable()); + + rtask + .withSrcStagingDirProvider(stagingDirectoryProvider) + .withDstStagingDirProvider(stagingDirectoryProvider) + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + assertEquals(true, rtask.isActionable()); + + List srcCommands = Lists.newArrayList(rtask.getSrcWhCommands()); + assertEquals(1,srcCommands.size()); + assertEquals(ExportCommand.class, srcCommands.get(0).getClass()); + + ExportCommand exportCommand = getExpectedExportCommand(rtask, null,false); + + assertEquals(ReplicationUtils.serializeCommand(exportCommand), + ReplicationUtils.serializeCommand(srcCommands.get(0))); + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(1,dstCommands.size()); + assertEquals(ImportCommand.class, dstCommands.get(0).getClass()); + + ImportCommand importCommand = getExpectedImportCommand(rtask,null,false); + + assertEquals(ReplicationUtils.serializeCommand(importCommand), + ReplicationUtils.serializeCommand(dstCommands.get(0))); + } + + @Test + public static void testDropTable() throws IOException { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_DROP_TABLE_EVENT, msgFactory.buildDropTableMessage(t).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyDropTableReplicationTask(rtask); + } + + private static void verifyDropTableReplicationTask(ReplicationTask rtask) throws IOException { + assertEquals(DropTableReplicationTask.class, rtask.getClass()); + assertEquals(false, rtask.needsStagingDirs()); + assertEquals(true, rtask.isActionable()); + + rtask + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + for (Command c : rtask.getSrcWhCommands()){ + assertEquals(NoopCommand.class, c.getClass()); + } + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(1,dstCommands.size()); + assertEquals(DropTableCommand.class, dstCommands.get(0).getClass()); + + DropTableCommand dropTableCommand = new DropTableCommand( + debugMapping.apply(rtask.getEvent().getDbName()), + debugMapping.apply(rtask.getEvent().getTableName()), + true, + rtask.getEvent().getEventId()); + + assertEquals(ReplicationUtils.serializeCommand(dropTableCommand), + ReplicationUtils.serializeCommand(dstCommands.get(0))); + } + + @Test + public static void testAlterTable() throws IOException { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_ALTER_TABLE_EVENT, msgFactory.buildAlterTableMessage(t, t).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyAlterTableReplicationTask(rtask); + } + + private static void verifyAlterTableReplicationTask(ReplicationTask rtask) throws IOException { + assertEquals(AlterTableReplicationTask.class, rtask.getClass()); + assertEquals(true, rtask.needsStagingDirs()); + assertEquals(false, rtask.isActionable()); + + rtask + .withSrcStagingDirProvider(stagingDirectoryProvider) + .withDstStagingDirProvider(stagingDirectoryProvider) + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + assertEquals(true, rtask.isActionable()); + + List srcCommands = Lists.newArrayList(rtask.getSrcWhCommands()); + assertEquals(1, srcCommands.size()); + assertEquals(ExportCommand.class, srcCommands.get(0).getClass()); + + ExportCommand exportCommand = getExpectedExportCommand(rtask, null, true); + + assertEquals(ReplicationUtils.serializeCommand(exportCommand), + ReplicationUtils.serializeCommand(srcCommands.get(0))); + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(1,dstCommands.size()); + assertEquals(ImportCommand.class, dstCommands.get(0).getClass()); + + ImportCommand importCommand = getExpectedImportCommand(rtask, null, true); + + assertEquals(ReplicationUtils.serializeCommand(importCommand), + ReplicationUtils.serializeCommand(dstCommands.get(0))); + } + + @Test + public static void testAddPartition() throws IOException { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + List pkeys = HCatSchemaUtils.getFieldSchemas( + HCatSchemaUtils.getHCatSchema("a:int,b:string").getFields()); + t.setPartitionKeys(pkeys); + + List addedPtns = new ArrayList(); + addedPtns.add(createPtn(t, Arrays.asList("120", "abc"))); + addedPtns.add(createPtn(t, Arrays.asList("201", "xyz"))); + + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_ADD_PARTITION_EVENT, msgFactory.buildAddPartitionMessage(t, addedPtns.iterator()).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyAddPartitionReplicationTask(rtask, t, addedPtns); + + } + + private static void verifyAddPartitionReplicationTask(ReplicationTask rtask, Table table, List addedPtns) throws IOException { + assertEquals(AddPartitionReplicationTask.class, rtask.getClass()); + assertEquals(true, rtask.needsStagingDirs()); + assertEquals(false,rtask.isActionable()); + + rtask + .withSrcStagingDirProvider(stagingDirectoryProvider) + .withDstStagingDirProvider(stagingDirectoryProvider) + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + assertEquals(true,rtask.isActionable()); + + List srcCommands = Lists.newArrayList(rtask.getSrcWhCommands()); + assertEquals(2,srcCommands.size()); + assertEquals(ExportCommand.class, srcCommands.get(0).getClass()); + assertEquals(ExportCommand.class, srcCommands.get(1).getClass()); + + ExportCommand exportCommand1 = getExpectedExportCommand(rtask, getPtnDesc(table,addedPtns.get(0)), false); + ExportCommand exportCommand2 = getExpectedExportCommand(rtask, getPtnDesc(table,addedPtns.get(1)), false); + + CommandTestUtils.compareCommands(exportCommand1, srcCommands.get(0), true); + CommandTestUtils.compareCommands(exportCommand2, srcCommands.get(1), true); + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(2,dstCommands.size()); + assertEquals(ImportCommand.class, dstCommands.get(0).getClass()); + assertEquals(ImportCommand.class, dstCommands.get(1).getClass()); + + ImportCommand importCommand1 = getExpectedImportCommand(rtask, getPtnDesc(table,addedPtns.get(0)), false); + ImportCommand importCommand2 = getExpectedImportCommand(rtask, getPtnDesc(table,addedPtns.get(1)), false); + + CommandTestUtils.compareCommands(importCommand1, dstCommands.get(0), true); + CommandTestUtils.compareCommands(importCommand2, dstCommands.get(1), true); + } + + private static Map getPtnDesc(Table t, Partition p) { + assertEquals(t.getPartitionKeysSize(),p.getValuesSize()); + Map retval = new HashMap(); + Iterator pval = p.getValuesIterator(); + for (FieldSchema fs : t.getPartitionKeys()){ + retval.put(fs.getName(),pval.next()); + } + return retval; + } + + private static Partition createPtn(Table t, List pvals) { + Partition ptn = new Partition(); + ptn.setDbName(t.getDbName()); + ptn.setTableName(t.getTableName()); + ptn.setValues(pvals); + return ptn; + } + + @Test + public static void testDropPartition() throws HCatException { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + List pkeys = HCatSchemaUtils.getFieldSchemas( + HCatSchemaUtils.getHCatSchema("a:int,b:string").getFields()); + t.setPartitionKeys(pkeys); + + Partition p = createPtn(t, Arrays.asList("102", "lmn")); + + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_DROP_PARTITION_EVENT, msgFactory.buildDropPartitionMessage(t,p).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyDropPartitionReplicationTask(rtask, t, p); + } + + private static void verifyDropPartitionReplicationTask(ReplicationTask rtask, Table table, Partition ptn) { + assertEquals(DropPartitionReplicationTask.class, rtask.getClass()); + assertEquals(false, rtask.needsStagingDirs()); + assertEquals(true, rtask.isActionable()); + + rtask + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + assertEquals(true,rtask.isActionable()); + + for (Command c : rtask.getSrcWhCommands()){ + assertEquals(NoopCommand.class, c.getClass()); + } + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(1,dstCommands.size()); + assertEquals(DropPartitionCommand.class, dstCommands.get(0).getClass()); + + DropPartitionCommand dropPartitionCommand = new DropPartitionCommand( + debugMapping.apply(rtask.getEvent().getDbName()), + debugMapping.apply(rtask.getEvent().getTableName()), + getPtnDesc(table,ptn), + true, + rtask.getEvent().getEventId() + ); + + CommandTestUtils.compareCommands(dropPartitionCommand, dstCommands.get(0), true); + } + + @Test + public static void testAlterPartition() throws HCatException { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + List pkeys = HCatSchemaUtils.getFieldSchemas( + HCatSchemaUtils.getHCatSchema("a:int,b:string").getFields()); + t.setPartitionKeys(pkeys); + Partition p = createPtn(t, Arrays.asList("102", "lmn")); + + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_ALTER_PARTITION_EVENT, msgFactory.buildAlterPartitionMessage(t,p,p).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyAlterPartitionReplicationTask(rtask, t, p); + } + + private static void verifyAlterPartitionReplicationTask(ReplicationTask rtask, Table table, Partition ptn) { + assertEquals(AlterPartitionReplicationTask.class, rtask.getClass()); + assertEquals(true, rtask.needsStagingDirs()); + assertEquals(false,rtask.isActionable()); + + rtask + .withSrcStagingDirProvider(stagingDirectoryProvider) + .withDstStagingDirProvider(stagingDirectoryProvider) + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + assertEquals(true,rtask.isActionable()); + + List srcCommands = Lists.newArrayList(rtask.getSrcWhCommands()); + assertEquals(1,srcCommands.size()); + assertEquals(ExportCommand.class, srcCommands.get(0).getClass()); + + ExportCommand exportCommand = getExpectedExportCommand(rtask, getPtnDesc(table, ptn), true); + CommandTestUtils.compareCommands(exportCommand, srcCommands.get(0), true); + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(1,dstCommands.size()); + assertEquals(ImportCommand.class, dstCommands.get(0).getClass()); + + ImportCommand importCommand = getExpectedImportCommand(rtask, getPtnDesc(table, ptn), true); + CommandTestUtils.compareCommands(importCommand, dstCommands.get(0), true); + } + + @Test + public static void testInsert() throws HCatException { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + List pkeys = HCatSchemaUtils.getFieldSchemas( + HCatSchemaUtils.getHCatSchema("a:int,b:string").getFields()); + t.setPartitionKeys(pkeys); + Partition p = createPtn(t, Arrays.asList("102", "lmn")); + List files = Arrays.asList("/tmp/test123"); + + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_INSERT_EVENT, msgFactory.buildInsertMessage( + t.getDbName(), + t.getTableName(), + getPtnDesc(t,p), + files + ).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(hev); + + assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyInsertReplicationTask(rtask, t, p); + } + + private static void verifyInsertReplicationTask(ReplicationTask rtask, Table table, Partition ptn) { + assertEquals(InsertReplicationTask.class, rtask.getClass()); + assertEquals(true, rtask.needsStagingDirs()); + assertEquals(false,rtask.isActionable()); + + rtask + .withSrcStagingDirProvider(stagingDirectoryProvider) + .withDstStagingDirProvider(stagingDirectoryProvider) + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + assertEquals(true,rtask.isActionable()); + + List srcCommands = Lists.newArrayList(rtask.getSrcWhCommands()); + assertEquals(1,srcCommands.size()); + assertEquals(ExportCommand.class, srcCommands.get(0).getClass()); + + ExportCommand exportCommand = getExpectedExportCommand(rtask, getPtnDesc(table, ptn), false); + CommandTestUtils.compareCommands(exportCommand, srcCommands.get(0), true); + + List dstCommands = Lists.newArrayList(rtask.getDstWhCommands()); + assertEquals(1,dstCommands.size()); + assertEquals(ImportCommand.class, dstCommands.get(0).getClass()); + + ImportCommand importCommand = getExpectedImportCommand(rtask, getPtnDesc(table, ptn), false); + CommandTestUtils.compareCommands(importCommand, dstCommands.get(0), true); + } + + private static long getEventId() { + // Does not need to be unique, just non-zero distinct value to test against. + return 42; + } + + private static int getTime() { + // Does not need to be actual time, just non-zero distinct value to test against. + return 1729; + } + + private static ImportCommand getExpectedImportCommand(ReplicationTask rtask, Map ptnDesc, boolean isMetadataOnly) { + String dbName = rtask.getEvent().getDbName(); + String tableName = rtask.getEvent().getTableName(); + return new ImportCommand( + ReplicationUtils.mapIfMapAvailable(dbName, debugMapping), + ReplicationUtils.mapIfMapAvailable(tableName, debugMapping), + ptnDesc, + stagingDirectoryProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + rtask.getEvent().getEventId(), + dbName, + tableName, + ptnDesc) + ), + isMetadataOnly, + rtask.getEvent().getEventId() + ); + } + + private static ExportCommand getExpectedExportCommand(ReplicationTask rtask, Map ptnDesc, boolean isMetadataOnly) { + String dbName = rtask.getEvent().getDbName(); + String tableName = rtask.getEvent().getTableName(); + return new ExportCommand( + dbName, + tableName, + ptnDesc, + stagingDirectoryProvider.getStagingDirectory( + ReplicationUtils.getUniqueKey( + rtask.getEvent().getEventId(), + dbName, + tableName, + ptnDesc) + ), + isMetadataOnly, + rtask.getEvent().getEventId() + ); + } + + + +} diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 8ef5394..91cc03e 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -263,9 +263,10 @@ public void alterPartition() throws Exception { assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType()); assertEquals("default", event.getDbName()); assertEquals("alterparttable", event.getTableName()); - assertTrue(event.getMessage().matches( "\\{\"eventType\":\"ALTER_PARTITION\",\"server\":\"\"," + + assertTrue(event.getMessage(), + event.getMessage().matches("\\{\"eventType\":\"ALTER_PARTITION\",\"server\":\"\"," + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alterparttable\"," + - "\"timestamp\":[0-9]+,\"values\":\\[\"today\"]}")); + "\"timestamp\":[0-9]+,\"keyValues\":\\{\"ds\":\"today\"}}")); } @Test @@ -303,55 +304,81 @@ public void dropPartition() throws Exception { @Test public void insertTable() throws Exception { + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "int", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, emptyParameters); + Table table = new Table("insertTable", "default", "me", startTime, startTime, 0, sd, null, + emptyParameters, null, null, null); + msClient.createTable(table); + FireEventRequestData data = new FireEventRequestData(); InsertEventRequestData insertData = new InsertEventRequestData(); data.setInsertData(insertData); insertData.addToFilesAdded("/warehouse/mytable/b1"); FireEventRequest rqst = new FireEventRequest(true, data); - rqst.setDbName("mydb"); - rqst.setTableName("mytable"); + rqst.setDbName("default"); + rqst.setTableName("insertTable"); msClient.fireListenerEvent(rqst); NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); - assertEquals(1, rsp.getEventsSize()); + assertEquals(2, rsp.getEventsSize()); - NotificationEvent event = rsp.getEvents().get(0); - assertEquals(firstEventId + 1, event.getEventId()); + NotificationEvent event = rsp.getEvents().get(1); + assertEquals(firstEventId + 2, event.getEventId()); assertTrue(event.getEventTime() >= startTime); assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType()); - assertEquals("mydb", event.getDbName()); - assertEquals("mytable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"INSERT\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"mydb\",\"table\":" + - "\"mytable\",\"timestamp\":[0-9]+,\"partitionValues\":null," + - "\"files\":\\[\"/warehouse/mytable/b1\"]}")); + assertEquals("default", event.getDbName()); + assertEquals("insertTable", event.getTableName()); + assertTrue(event.getMessage(), + event.getMessage().matches("\\{\"eventType\":\"INSERT\",\"server\":\"\"," + + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + + "\"insertTable\",\"timestamp\":[0-9]+,\"files\":\\[\"/warehouse/mytable/b1\"]," + + "\"partKeyVals\":\\{},\"partitionKeyValues\":\\{}}")); } @Test public void insertPartition() throws Exception { + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "int", "nocomment")); + List partCols = new ArrayList(); + partCols.add(new FieldSchema("ds", "string", "")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, emptyParameters); + Table table = new Table("insertPartition", "default", "me", startTime, startTime, 0, sd, + partCols, emptyParameters, null, null, null); + msClient.createTable(table); + Partition partition = new Partition(Arrays.asList("today"), "default", "insertPartition", + startTime, startTime, sd, emptyParameters); + msClient.add_partition(partition); + FireEventRequestData data = new FireEventRequestData(); InsertEventRequestData insertData = new InsertEventRequestData(); data.setInsertData(insertData); insertData.addToFilesAdded("/warehouse/mytable/today/b1"); FireEventRequest rqst = new FireEventRequest(true, data); - rqst.setDbName("mydb"); - rqst.setTableName("mytable"); + rqst.setDbName("default"); + rqst.setTableName("insertPartition"); rqst.setPartitionVals(Arrays.asList("today")); msClient.fireListenerEvent(rqst); NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); - assertEquals(1, rsp.getEventsSize()); + assertEquals(3, rsp.getEventsSize()); - NotificationEvent event = rsp.getEvents().get(0); - assertEquals(firstEventId + 1, event.getEventId()); + NotificationEvent event = rsp.getEvents().get(2); + assertEquals(firstEventId + 3, event.getEventId()); assertTrue(event.getEventTime() >= startTime); assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType()); - assertEquals("mydb", event.getDbName()); - assertEquals("mytable", event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"INSERT\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"mydb\",\"table\":" + - "\"mytable\",\"timestamp\":[0-9]+,\"partitionValues\":\\[\"today\"]," + - "\"files\":\\[\"/warehouse/mytable/today/b1\"]}")); + assertEquals("default", event.getDbName()); + assertEquals("insertPartition", event.getTableName()); + assertTrue(event.getMessage(), + event.getMessage().matches("\\{\"eventType\":\"INSERT\",\"server\":\"\"," + + "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + + "\"insertPartition\",\"timestamp\":[0-9]+," + + "\"files\":\\[\"/warehouse/mytable/today/b1\"],\"partKeyVals\":\\{\"ds\":\"today\"}," + + "\"partitionKeyValues\":\\{\"ds\":\"today\"}}")); } @Test diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java index be66cbe..2a313b0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java @@ -19,13 +19,14 @@ package org.apache.hadoop.hive.metastore.events; import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; -import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; -import java.util.Arrays; -import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; public class InsertEvent extends ListenerEvent { @@ -33,24 +34,30 @@ // we have just the string names, but that's fine for what we need. private final String db; private final String table; - private final List partVals; + private final Map keyValues; private final List files; /** * * @param db name of the database the table is in * @param table name of the table being inserted into - * @param partitions list of partition values, can be null + * @param partVals list of partition values, can be null * @param status status of insert, true = success, false = failure * @param handler handler that is firing the event */ - public InsertEvent(String db, String table, List partitions, List files, - boolean status, HMSHandler handler) { + public InsertEvent(String db, String table, List partVals, List files, + boolean status, HMSHandler handler) throws MetaException, NoSuchObjectException { super(status, handler); this.db = db; this.table = table; - this.partVals = partitions; this.files = files; + Table t = handler.get_table(db,table); + keyValues = new LinkedHashMap(); + if (partVals != null) { + for (int i = 0; i < partVals.size(); i++) { + keyValues.put(t.getPartitionKeys().get(i).getName(), partVals.get(i)); + } + } } public String getDb() { @@ -64,10 +71,10 @@ public String getTable() { } /** - * @return List of partitions. + * @return List of values for the partition keys. */ - public List getPartitions() { - return partVals; + public Map getPartitionKeyValues() { + return keyValues; } /**