diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 766d858..4ffc7fb 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; @@ -589,6 +590,72 @@ public void testIncrementalAdds() throws IOException { } @Test + public void testIncrementalLoadWithVariableLengthEventId() throws IOException, TException { + String testName = "incrementalLoadWithVariableLengthEventId"; + String dbName = createDB(testName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + + String[] empty = new String[] {}; + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + // Each insert generates 3 events. So, in total 15 events gets generated by this loop. + for (int i = 0; i < 5; i++) { + run("INSERT INTO TABLE " + dbName + ".unptned values('" + Integer.toString(i) + "')"); + } + + verifySetup("SELECT count(a) from " + dbName + ".unptned", new String[] {"5"}); + + // Truncate at the end so that the data set is empty after incremental load. + run("TRUNCATE TABLE " + dbName + ".unptned"); + verifySetup("SELECT a from " + dbName + ".unptned", empty); + + // Inject a behaviour where the event ID starts with 90 and hence overflow across 100 boundary. + // This enesures variable length of event ID in the incremental dump + BehaviourInjection eventIdModifier + = new BehaviourInjection(){ + private int eventIdIter = 90; + @Nullable + @Override + public NotificationEventResponse apply(@Nullable NotificationEventResponse eventIdList) { + if (null != eventIdList) { + List eventIds = eventIdList.getEvents(); + for (int i = 0; i < eventIds.size(); i++) { + eventIds.get(i).setEventId(eventIdIter++); + } + injectionPathCalled = true; + return new NotificationEventResponse(eventIds); + } else { + return eventIdList; + } + } + }; + InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdModifier); + + advanceDumpDir(); + String cmd = "REPL DUMP " + dbName + " FROM " + replDumpId; + if (metaStoreClient.getCurrentNotificationEventId().getEventId() < 150) { + cmd = cmd + " TO 150"; + } + run(cmd); + eventIdModifier.assertInjectionsPerformed(true,false); + InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour + + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", empty); + } + + @Test public void testDrops() throws IOException { String name = testName.getMethodName(); diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java index ed6d4be..7998240 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java @@ -20,6 +20,8 @@ import java.util.List; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.Table; import static org.junit.Assert.assertEquals; @@ -53,7 +55,10 @@ public void assertInjectionsPerformed( com.google.common.base.Functions.identity(); private static com.google.common.base.Function, List> listPartitionNamesModifier = com.google.common.base.Functions.identity(); + private static com.google.common.base.Function + getNextNotificationModifier = com.google.common.base.Functions.identity(); + // Methods to set/get/reset getTable modifier public static void setGetTableBehaviour(com.google.common.base.Function modifier){ getTableModifier = (modifier == null)? com.google.common.base.Functions.identity() : modifier; } @@ -66,6 +71,7 @@ public static void resetGetTableBehaviour(){ return getTableModifier; } + // Methods to set/get/reset listPartitionNames modifier public static void setListPartitionNamesBehaviour(com.google.common.base.Function, List> modifier){ listPartitionNamesModifier = (modifier == null)? com.google.common.base.Functions.identity() : modifier; } @@ -78,6 +84,22 @@ public static void resetListPartitionNamesBehaviour(){ return listPartitionNamesModifier; } + // Methods to set/get/reset getNextNotification modifier + public static void setGetNextNotificationBehaviour( + com.google.common.base.Function modifier){ + getNextNotificationModifier = (modifier == null)? com.google.common.base.Functions.identity() : modifier; + } + + public static void resetGetNextNotificationBehaviour(){ + setGetNextNotificationBehaviour(null); + } + + public static com.google.common.base.Function + getGetNextNotificationBehaviour() { + return getNextNotificationModifier; + } + + // ObjectStore methods to be overridden with injected behavior @Override public Table getTable(String dbName, String tableName) throws MetaException { return getTableModifier.apply(super.getTable(dbName, tableName)); @@ -87,4 +109,9 @@ public Table getTable(String dbName, String tableName) throws MetaException { public List listPartitionNames(String dbName, String tableName, short max) throws MetaException { return listPartitionNamesModifier.apply(super.listPartitionNames(dbName, tableName, max)); } + + @Override + public NotificationEventResponse getNextNotification(NotificationEventRequest rqst) { + return getNextNotificationModifier.apply(super.getNextNotification(rqst)); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index adcdc12..961561d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; @@ -555,8 +556,9 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { analyzeDatabaseLoad(dbNameOrPattern, fs, dir); } } else { - // event dump, each subdir is an individual event dump. - Arrays.sort(dirsInLoadPath); // we need to guarantee that the directory listing we got is in order of evid. + // Event dump, each sub-dir is an individual event dump. + // We need to guarantee that the directory listing we got is in order of evid. + Arrays.sort(dirsInLoadPath, new EventDumpDirComparator()); Task evTaskRoot = TaskFactory.get(new DependencyCollectionWork(), conf); Task taskChainTail = evTaskRoot; @@ -572,6 +574,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { loadPath.toUri().toString()); for (FileStatus dir : dirsInLoadPath){ LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern); + // event loads will behave similar to table loads, with one crucial difference // precursor order is strict, and each event must be processed after the previous one. // The way we handle this strict order is as follows: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/EventDumpDirComparator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/EventDumpDirComparator.java new file mode 100644 index 0000000..6bb1413 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/EventDumpDirComparator.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load; + +import java.util.Comparator; +import org.apache.hadoop.fs.FileStatus; + +public class EventDumpDirComparator implements Comparator{ + @Override + public int compare(FileStatus o1, FileStatus o2) { + // It is enough to compare the last level sub-directory which has the name as event ID + String dir1 = o1.getPath().getName(); + String dir2 = o2.getPath().getName(); + + // First compare the length and then compare the directory name + if (dir1.length() > dir2.length()) { + return 1; + } else if (dir1.length() < dir2.length()) { + return -1; + } + return dir1.compareTo(dir2); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/TestEventDumpDirComparator.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/TestEventDumpDirComparator.java new file mode 100644 index 0000000..791b7c8 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/TestEventDumpDirComparator.java @@ -0,0 +1,30 @@ +package org.apache.hadoop.hive.ql.parse.repl.load; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; +import java.util.Arrays; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class TestEventDumpDirComparator { + + @Test + public void fileStatusArraySortingWithEventDumpDirComparator() { + FileStatus[] dirList = new FileStatus[30]; + for (int i = 0; i < 30; i+=2){ + dirList[i] = new FileStatus(5, true, 1, 64, 100, + new Path("hdfs://tmp/"+Integer.toString(i+1))); + } + for (int i = 1; i < 30; i+=2){ + dirList[i] = new FileStatus(5, true, 1, 64, 100, + new Path("hdfs://tmp/"+Integer.toString(i-1))); + } + + Arrays.sort(dirList, new EventDumpDirComparator()); + for (int i = 0; i < 30; i++){ + assertTrue(dirList[i].getPath().getName().equalsIgnoreCase(Integer.toString(i))); + } + } +}