diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 766d858..a3dc667 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -21,13 +21,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; -import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; @@ -589,6 +590,113 @@ public void testIncrementalAdds() throws IOException { } @Test + public void testIncrementalLoadWithVariableLengthEventId() throws IOException, TException { + String testName = "incrementalLoadWithVariableLengthEventId"; + String dbName = createDB(testName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + run("INSERT INTO TABLE " + dbName + ".unptned values('ten')"); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + // CREATE_TABLE - TRUNCATE - INSERT - The result is just one record. + // Creating dummy table to control the event ID of TRUNCATE not to be 10 or 100 or 1000... + String[] unptn_data = new String[]{ "eleven" }; + run("CREATE TABLE " + dbName + ".dummy(a string) STORED AS TEXTFILE"); + run("TRUNCATE TABLE " + dbName + ".unptned"); + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')"); + + // Inject a behaviour where all events will get ID less than 100 except TRUNCATE which will get ID 100. + // This enesures variable length of event ID in the incremental dump + BehaviourInjection eventIdModifier + = new BehaviourInjection(){ + private long nextEventId = 0; // Initialize to 0 as for increment dump, 0 won't be used. + + @Nullable + @Override + public NotificationEventResponse apply(@Nullable NotificationEventResponse eventIdList) { + if (null != eventIdList) { + List eventIds = eventIdList.getEvents(); + List outEventIds = new ArrayList(); + for (int i = 0; i < eventIds.size(); i++) { + NotificationEvent event = eventIds.get(i); + + // Skip all the events belong to other DBs/tables. + if (event.getDbName().equalsIgnoreCase(dbName)) { + // We will encounter create_table, truncate followed by insert. + // For the insert, set the event ID longer such that old comparator picks insert before truncate + // Eg: Event IDs CREATE_TABLE - 5, TRUNCATE - 9, INSERT - 12 changed to + // CREATE_TABLE - 5, TRUNCATE - 9, INSERT - 100 + // But if TRUNCATE have ID-10, then having INSERT-100 won't be sufficient to test the scenario. + // So, we set any event comes after CREATE_TABLE starts with 20. + // Eg: Event IDs CREATE_TABLE - 5, TRUNCATE - 10, INSERT - 12 changed to + // CREATE_TABLE - 5, TRUNCATE - 20(20 <= Id < 100), INSERT - 100 + switch (event.getEventType()) { + case "CREATE_TABLE": { + // The next ID is set to 20 or 200 or 2000 ... based on length of current event ID + // This is done to ensure TRUNCATE doesn't get an ID 10 or 100... + nextEventId = (long) Math.pow(10.0, (double) String.valueOf(event.getEventId()).length()) * 2; + break; + } + case "INSERT": { + // INSERT will come always after CREATE_TABLE, TRUNCATE. So, no need to validate nextEventId + nextEventId = (long) Math.pow(10.0, (double) String.valueOf(nextEventId).length()); + LOG.info("Changed EventId #{} to #{}", event.getEventId(), nextEventId); + event.setEventId(nextEventId++); + break; + } + default: { + // After CREATE_TABLE all the events in this DB should get an ID >= 20 or 200 ... + if (nextEventId > 0) { + LOG.info("Changed EventId #{} to #{}", event.getEventId(), nextEventId); + event.setEventId(nextEventId++); + } + break; + } + } + + outEventIds.add(event); + } + } + injectionPathCalled = true; + if (outEventIds.isEmpty()) { + return eventIdList; // If not even one event belongs to current DB, then return original one itself. + } else { + // If the new list is not empty (input list have some events from this DB), then return it + return new NotificationEventResponse(outEventIds); + } + } else { + return null; + } + } + }; + InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdModifier); + + // It is possible that currentNotificationEventID from metastore is less than newly set event ID by stub function. + // In this case, REPL DUMP will skip events beyond this upper limit. + // So, to avoid this failure, we will set the TO clause to ID 100 times of currentNotificationEventID + String cmd = "REPL DUMP " + dbName + " FROM " + replDumpId + + " TO " + String.valueOf(metaStoreClient.getCurrentNotificationEventId().getEventId()*100); + + advanceDumpDir(); + run(cmd); + eventIdModifier.assertInjectionsPerformed(true,false); + InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour + + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data); + } + + @Test public void testDrops() throws IOException { String name = testName.getMethodName(); diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java index ed6d4be..d89c54c 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java @@ -20,6 +20,8 @@ import java.util.List; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.Table; import static org.junit.Assert.assertEquals; @@ -53,7 +55,10 @@ public void assertInjectionsPerformed( com.google.common.base.Functions.identity(); private static com.google.common.base.Function, List> listPartitionNamesModifier = com.google.common.base.Functions.identity(); + private static com.google.common.base.Function + getNextNotificationModifier = com.google.common.base.Functions.identity(); + // Methods to set/reset getTable modifier public static void setGetTableBehaviour(com.google.common.base.Function modifier){ getTableModifier = (modifier == null)? com.google.common.base.Functions.identity() : modifier; } @@ -62,10 +67,7 @@ public static void resetGetTableBehaviour(){ setGetTableBehaviour(null); } - public static com.google.common.base.Function getGetTableBehaviour() { - return getTableModifier; - } - + // Methods to set/reset listPartitionNames modifier public static void setListPartitionNamesBehaviour(com.google.common.base.Function, List> modifier){ listPartitionNamesModifier = (modifier == null)? com.google.common.base.Functions.identity() : modifier; } @@ -74,10 +76,17 @@ public static void resetListPartitionNamesBehaviour(){ setListPartitionNamesBehaviour(null); } - public static com.google.common.base.Function, List> getListPartitionNamesBehaviour() { - return listPartitionNamesModifier; + // Methods to set/reset getNextNotification modifier + public static void setGetNextNotificationBehaviour( + com.google.common.base.Function modifier){ + getNextNotificationModifier = (modifier == null)? com.google.common.base.Functions.identity() : modifier; + } + + public static void resetGetNextNotificationBehaviour(){ + setGetNextNotificationBehaviour(null); } + // ObjectStore methods to be overridden with injected behavior @Override public Table getTable(String dbName, String tableName) throws MetaException { return getTableModifier.apply(super.getTable(dbName, tableName)); @@ -87,4 +96,9 @@ public Table getTable(String dbName, String tableName) throws MetaException { public List listPartitionNames(String dbName, String tableName, short max) throws MetaException { return listPartitionNamesModifier.apply(super.listPartitionNames(dbName, tableName, max)); } + + @Override + public NotificationEventResponse getNextNotification(NotificationEventRequest rqst) { + return getNextNotificationModifier.apply(super.getNextNotification(rqst)); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index adcdc12..961561d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; @@ -555,8 +556,9 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { analyzeDatabaseLoad(dbNameOrPattern, fs, dir); } } else { - // event dump, each subdir is an individual event dump. - Arrays.sort(dirsInLoadPath); // we need to guarantee that the directory listing we got is in order of evid. + // Event dump, each sub-dir is an individual event dump. + // We need to guarantee that the directory listing we got is in order of evid. + Arrays.sort(dirsInLoadPath, new EventDumpDirComparator()); Task evTaskRoot = TaskFactory.get(new DependencyCollectionWork(), conf); Task taskChainTail = evTaskRoot; @@ -572,6 +574,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { loadPath.toUri().toString()); for (FileStatus dir : dirsInLoadPath){ LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern); + // event loads will behave similar to table loads, with one crucial difference // precursor order is strict, and each event must be processed after the previous one. // The way we handle this strict order is as follows: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/EventDumpDirComparator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/EventDumpDirComparator.java new file mode 100644 index 0000000..6bb1413 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/EventDumpDirComparator.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load; + +import java.util.Comparator; +import org.apache.hadoop.fs.FileStatus; + +public class EventDumpDirComparator implements Comparator{ + @Override + public int compare(FileStatus o1, FileStatus o2) { + // It is enough to compare the last level sub-directory which has the name as event ID + String dir1 = o1.getPath().getName(); + String dir2 = o2.getPath().getName(); + + // First compare the length and then compare the directory name + if (dir1.length() > dir2.length()) { + return 1; + } else if (dir1.length() < dir2.length()) { + return -1; + } + return dir1.compareTo(dir2); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/TestEventDumpDirComparator.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/TestEventDumpDirComparator.java new file mode 100644 index 0000000..58a6d4d --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/TestEventDumpDirComparator.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import java.util.Arrays; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class TestEventDumpDirComparator { + + @Test + public void fileStatusArraySortingWithEventDumpDirComparator() { + int size = 30; + FileStatus[] dirList = new FileStatus[size]; + + for (int i = 0; i < size; i++){ + dirList[i] = new FileStatus(5, true, 1, 64, 100, + new Path("hdfs://tmp/"+Integer.toString(size-i))); + } + + Arrays.sort(dirList, new EventDumpDirComparator()); + for (int i = 0; i < 30; i++){ + assertTrue(dirList[i].getPath().getName().equalsIgnoreCase(Integer.toString(i+1))); + } + } +}