diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index f67fc817d1..546678beb5 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -117,7 +117,8 @@ private String replicateAndVerify(String replPolicy, String lastReplId, List dumpWithClause, List loadWithClause, String[] expectedTables) throws Throwable { - return replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, loadWithClause, null, expectedTables); + return replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, loadWithClause, + null, expectedTables); } private String replicateAndVerify(String replPolicy, String oldReplPolicy, String lastReplId, @@ -125,6 +126,16 @@ private String replicateAndVerify(String replPolicy, String oldReplPolicy, Strin List loadWithClause, String[] bootstrappedTables, String[] expectedTables) throws Throwable { + return replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, dumpWithClause, loadWithClause, + bootstrappedTables, expectedTables, null); + } + + private String replicateAndVerify(String replPolicy, String oldReplPolicy, String lastReplId, + List dumpWithClause, + List loadWithClause, + String[] bootstrappedTables, + String[] expectedTables, + String[] records) throws Throwable { if (dumpWithClause == null) { dumpWithClause = new ArrayList<>(); } @@ -138,7 +149,7 @@ private String replicateAndVerify(String replPolicy, String oldReplPolicy, Strin } WarehouseInstance.Tuple tuple = primary.dump(replPolicy, oldReplPolicy, lastReplId, dumpWithClause); - if (oldReplPolicy != null) { + if (bootstrappedTables != null) { verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, bootstrappedTables); } @@ -146,6 +157,15 @@ private String replicateAndVerify(String replPolicy, String oldReplPolicy, Strin .run("use " + replicatedDbName) .run("show tables") .verifyResults(expectedTables); + + if (records == null) { + records = new String[] {"1"}; + } + for (String table : expectedTables) { + replica.run("use " + replicatedDbName) + .run("select a from " + table) + .verifyResults(records); + } return tuple.lastReplicationId; } @@ -585,4 +605,185 @@ public void testReplacePolicyOnBootstrapExternalTablesIncrementalPhase() throws .run("show tables") .verifyResults(incrementalReplicatedTables); } + + @Test + public void testRenameTableScenariosBasic() throws Throwable { + String replPolicy = primaryDbName + ".['in[0-9]+'].['out[0-9]+']"; + String lastReplId = replicateAndVerify(replPolicy, null, null, null, + null, new String[]{}, new String[]{}); + + String[] originalNonAcidTables = new String[]{"in1", "in2", "out3", "out4", "out5", "out6"}; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + + // Replicate and verify if only 2 tables are replicated to target. + String[] replicatedTables = new String[]{"in1", "in2"}; + String[] bootstrapTables = new String[]{}; + lastReplId = replicateAndVerify(replPolicy, null, lastReplId, null, + null, bootstrapTables, replicatedTables); + + // Rename tables to make them satisfy the filter. + primary.run("use " + primaryDbName) + .run("alter table out3 rename to in3") + .run("alter table out4 rename to in4") + .run("alter table out5 rename to in5"); + + replicatedTables = new String[]{"in1", "in2", "in3", "in4", "in5"}; + bootstrapTables = new String[]{"in3", "in4", "in5"}; + lastReplId = replicateAndVerify(replPolicy, null, lastReplId, null, + null, bootstrapTables, replicatedTables); + + primary.run("use " + primaryDbName) + .run("alter table in3 rename to in7") + .run("alter table in7 rename to in8") // Double rename, both satisfying the filter, so no bootstrap. + .run("alter table in4 rename to out9") // out9 does not match the filter so in4 should be dropped. + .run("alter table in5 rename to out10") // Rename from satisfying name to not satisfying name. + .run("alter table out10 rename to in11"); // from non satisfying to satisfying, should be bootstrapped + + replicatedTables = new String[]{"in1", "in2", "in8", "in11"}; + bootstrapTables = new String[]{"in11"}; + lastReplId = replicateAndVerify(replPolicy, null, lastReplId, null, + null, bootstrapTables, replicatedTables); + + primary.run("use " + primaryDbName) + .run("alter table in8 rename to in12") // table is renamed from satisfying to satisfying, no bootstrap + .run("alter table out9 rename to in13") // out9 does not match the filter so in13 should be bootstrapped. + .run("alter table in13 rename to in14") // table is renamed from satisfying to satisfying + .run("alter table in2 rename to out200") // this will change the rename to drop in2 + .run("alter table out200 rename to in200") // this will add the bootstrap for in200 + .run("alter table in1 rename to out100") // this will change the rename to drop + .run("alter table out100 rename to in100") // this will add the bootstrap + .run("drop table in100"); // table in100 is dropped, so no bootstrap should happen. + + replicatedTables = new String[]{"in200", "in12", "in12", "in14"}; + bootstrapTables = new String[]{"in14", "in200"}; + replicateAndVerify(replPolicy, null, lastReplId, null, + null, bootstrapTables, replicatedTables); + } + + @Test + public void testRenameTableScenariosWithDmlOperations() throws Throwable { + String replPolicy = primaryDbName + ".['in[0-9]+'].['out[0-9]+']"; + String lastReplId = replicateAndVerify(replPolicy, null, null, null, + null, new String[]{}, new String[]{}); + + String[] originalFullAcidTables = new String[]{"in1"}; + String[] originalNonAcidTables = new String[]{"in100"}; + createTables(originalFullAcidTables, CreateTableType.FULL_ACID); + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + + // Replicate and verify if only 2 tables are replicated to target. + String[] replicatedTables = new String[]{"in1", "in100"}; + String[] bootstrapTables = new String[]{}; + lastReplId = replicateAndVerify(replPolicy, null, lastReplId, null, + null, bootstrapTables, replicatedTables); + + // Rename tables and do some operations. + primary.run("use " + primaryDbName) + .run("alter table in1 rename to out1") + .run("insert into out1 values(2, 100)") + .run("alter table out1 rename to in4") + .run("alter table in100 rename to out100") + .run("insert into out100 values(2, 100)") + .run("alter table out100 rename to in400"); + + replicatedTables = new String[]{"in4", "in400"}; + bootstrapTables = new String[]{"in4", "in400"}; + replicateAndVerify(replPolicy, null, lastReplId, null, + null, bootstrapTables, replicatedTables, new String[] {"1", "2"}); + } + + @Test + public void testRenameTableScenariosAcidTable() throws Throwable { + String replPolicy = primaryDbName + ".['in[0-9]+'].['out[0-9]+']"; + List dumpWithClause = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='false'", + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'" + ); + String lastReplId = replicateAndVerify(replPolicy, null, null, dumpWithClause, + null, new String[]{}, new String[]{}); + + String[] originalNonAcidTables = new String[]{"in1", "out4"}; + String[] originalFullAcidTables = new String[]{"in2", "out5"}; + String[] originalMMAcidTables = new String[]{"out3", "out6"}; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + createTables(originalFullAcidTables, CreateTableType.FULL_ACID); + createTables(originalMMAcidTables, CreateTableType.MM_ACID); + + // Replicate and verify if only 1 tables are replicated to target. Acid tables are not dumped. + String[] replicatedTables = new String[]{"in1"}; + String[] bootstrapTables = new String[]{}; + lastReplId = replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, + null, bootstrapTables, replicatedTables); + + // Rename tables to make them satisfy the filter and enable acid tables. + primary.run("use " + primaryDbName) + .run("alter table out3 rename to in3") + .run("alter table out4 rename to in4") + .run("alter table out5 rename to in5"); + + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'", + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'"); + replicatedTables = new String[]{"in1", "in2", "in3", "in4", "in5"}; + bootstrapTables = new String[]{"in2", "in3", "in4", "in5"}; + replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, + null, bootstrapTables, replicatedTables); + } + + @Test + public void testRenameTableScenariosExternalTable() throws Throwable { + String replPolicy = primaryDbName + ".['in[0-9]+'].['out[0-9]+']"; + List loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + List dumpWithClause = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='false'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='false'", + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'" + ); + String lastReplId = replicateAndVerify(replPolicy, null, null, dumpWithClause, + loadWithClause, new String[]{}, new String[]{}); + + String[] originalNonAcidTables = new String[]{"in1", "out4"}; + String[] originalExternalTables = new String[]{"in2", "out5"}; + String[] originalMMAcidTables = new String[]{"in3", "out6"}; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + createTables(originalExternalTables, CreateTableType.EXTERNAL); + createTables(originalMMAcidTables, CreateTableType.MM_ACID); + + // Replicate and verify if only 1 tables are replicated to target. Acid and external tables are not dumped. + String[] replicatedTables = new String[]{"in1"}; + String[] bootstrapTables = new String[]{}; + lastReplId = replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, + loadWithClause, bootstrapTables, replicatedTables); + + // Rename tables to make them satisfy the filter and enable acid and external tables. + primary.run("use " + primaryDbName) + .run("alter table out4 rename to in4") + .run("alter table out5 rename to in5"); + + dumpWithClause = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'", + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'" + ); + replicatedTables = new String[]{"in1", "in2", "in3", "in4", "in5"}; + bootstrapTables = new String[]{"in2", "in3", "in4", "in5"}; + lastReplId = replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, + loadWithClause, null, replicatedTables); + + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'"); + + primary.run("use " + primaryDbName) + .run("alter table out6 rename to in6") // external table bootstrap. + .run("alter table in5 rename to out7") // in5 should be deleted. + .run("alter table out7 rename to in7") // MM table bootstrap. + .run("alter table in1 rename to out10") // in1 should be deleted. + .run("alter table out10 rename to in11"); // normal table bootstrapped + + replicatedTables = new String[]{"in2", "in3", "in4", "in11", "in6", "in7"}; + bootstrapTables = new String[]{"in11", "in6", "in7"}; + replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, + loadWithClause, bootstrapTables, replicatedTables); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 000d663ae9..b04ccb4279 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -75,6 +75,8 @@ import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; @@ -82,6 +84,7 @@ private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; private static final String FUNCTION_METADATA_FILE_NAME = EximUtil.METADATA_NAME; private static final long SLEEP_TIME = 60000; + Set tablesForBootstrap = new HashSet<>(); public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk", "f_"); private final String name; @@ -145,10 +148,12 @@ private void prepareReturnValues(List values) throws SemanticException { * those tables as a whole. * 3. If replication policy is changed/replaced, then need to examine all the tables to see if * any of them need to be bootstrapped as old policy doesn't include it but new one does. + * 4. Some tables are renamed and the new name satisfies the table list filter while old name was not. * @return true if need to examine tables for dump and false if not. */ private boolean shouldExamineTablesToDump() { return (work.oldReplScope != null) + || !tablesForBootstrap.isEmpty() || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES) || conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES); } @@ -201,6 +206,12 @@ private boolean shouldBootstrapDumpTable(Table table) { return true; } + // If the table is renamed and the new name satisfies the filter but the old name does not then the table needs to + // be bootstrapped. + if (tablesForBootstrap.contains(table.getTableName().toLowerCase())) { + return true; + } + // If replication policy is changed with new included/excluded tables list, then tables which // are not included in old policy but included in new policy should be bootstrapped along with // the current incremental replication dump. @@ -320,10 +331,18 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive } private boolean needBootstrapAcidTablesDuringIncrementalDump() { - // If old replication policy is available, then it is possible some of the ACID tables might be - // included for bootstrap during incremental dump. - return (ReplUtils.includeAcidTableInDump(conf) - && ((work.oldReplScope != null) || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES))); + // If acid table dump is not enabled, then no neeed to check further. + if (!ReplUtils.includeAcidTableInDump(conf)) { + return false; + } + + // If old table level policy is available or the policy has filter based on table name then it is possible that some + // of the ACID tables might be included for bootstrap during incremental dump. For old policy, its because the table + // may not satisfying the old policy but satisfying the new policy. For filter, it may happen that the table + // is renamed and started satisfying the policy. + return ((!work.replScope.includeAllTables()) + || (work.oldReplScope != null) + || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)); } private Path getBootstrapDbRoot(Path dumpRoot, String dbName, boolean isIncrementalPhase) { @@ -341,7 +360,8 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db) conf, getNewEventOnlyReplicationSpec(ev.getEventId()), work.replScope, - work.oldReplScope + work.oldReplScope, + tablesForBootstrap ); EventHandler eventHandler = EventHandlerFactory.handlerFor(ev); eventHandler.handle(context); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index e95fe1f732..c1ebfd5ab0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -497,9 +497,6 @@ private int executeIncrementalLoad(DriverContext driverContext) { // If incremental events are already applied, then check and perform if need to bootstrap any tables. if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) { - // No need to set incremental load pending flag for external tables as the files will be copied to the same path - // for external table unlike migrated txn tables. Currently bootstrap during incremental is done only for - // external tables. if (work.hasBootstrapLoadTasks()) { LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap " + "mode after applying all events."); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java index 4ec1bac6ee..7be415bb6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java @@ -96,6 +96,12 @@ public MessageHandler handler() { return new RenameTableHandler(); } }, + EVENT_RENAME_DROP_TABLE("EVENT_RENAME_DROP_TABLE") { + @Override + public MessageHandler handler() { + return new DropTableHandler(); + } + }, EVENT_TRUNCATE_TABLE("EVENT_TRUNCATE_TABLE") { @Override public MessageHandler handler() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 4500fb63e6..56850417dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -169,7 +169,8 @@ private void writeData(PartitionIterable partitions) throws SemanticException { } private boolean shouldExport() { - return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle, false, null, conf); + return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle, + false, null, null, conf); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 11a4d6271e..b06279dff8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -47,6 +47,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Set; import java.util.List; import java.util.Map; import java.util.UUID; @@ -176,8 +177,8 @@ public static boolean isBootstrapDumpInProgress(Hive hiveDb, String dbName) thro * validates if a table can be exported, similar to EximUtil.shouldExport with few replication * specific checks. */ - public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table tableHandle, - boolean isEventDump, ReplScope oldReplScope, HiveConf hiveConf) { + public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table tableHandle, boolean isEventDump, + Set bootstrapTableList, ReplScope oldReplScope, HiveConf hiveConf) { if (replicationSpec == null) { replicationSpec = new ReplicationSpec(); } @@ -214,21 +215,26 @@ public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table tab return false; } - // Skip dumping events related to ACID tables if bootstrap is enabled on it. - // Also, skip if current table is included only in new policy but not in old policy. - if (isEventDump) { - return !hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES) - && ReplUtils.tableIncludedInReplScope(oldReplScope, tableHandle.getTableName()); + // Skip dumping events related to ACID tables if bootstrap is enabled for ACID tables. + if (isEventDump && hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) { + return false; } } - // If replication policy is replaced with new included/excluded tables list, then events - // corresponding to tables which are not included in old policy but included in new policy - // should be skipped. Those tables would be bootstrapped along with the current incremental - // replication dump. - // Note: If any event dump reaches here, it means, table is included in new replication policy. - if (isEventDump && !ReplUtils.tableIncludedInReplScope(oldReplScope, tableHandle.getTableName())) { - return false; + // Tables which are selected for bootstrap should be skipped. Those tables would be bootstrapped + // along with the current incremental replication dump and thus no need to dump events for them. + // Note: If any event (other than alter table with table level replication) dump reaches here, it means, table is + // included in new replication policy. + if (isEventDump) { + // If replication policy is replaced with new included/excluded tables list, then events + // corresponding to tables which are not included in old policy but included in new policy + // should be skipped. + if (!ReplUtils.tableIncludedInReplScope(oldReplScope, tableHandle.getTableName())) { + return false; + } + + // Tables in the list of tables to be bootstrapped should be skipped. + return (bootstrapTableList == null || !bootstrapTableList.contains(tableHandle.getTableName())); } } return true; @@ -236,7 +242,8 @@ public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table tab public static boolean shouldReplicate(NotificationEvent tableForEvent, ReplicationSpec replicationSpec, Hive db, - boolean isEventDump, ReplScope oldReplScope, + boolean isEventDump, Set bootstrapTableList, + ReplScope oldReplScope, HiveConf hiveConf) { Table table; try { @@ -247,7 +254,7 @@ public static boolean shouldReplicate(NotificationEvent tableForEvent, .getTableName(), e); return false; } - return shouldReplicate(replicationSpec, table, isEventDump, oldReplScope, hiveConf); + return shouldReplicate(replicationSpec, table, isEventDump, bootstrapTableList, oldReplScope, hiveConf); } static List getDataPathList(Path fromPath, ReplicationSpec replicationSpec, HiveConf conf) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java index 5db3f26262..e1b184df94 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; @@ -35,6 +36,9 @@ AbortTxnMessage eventMessage(String stringRepresentation) { @Override public void handle(Context withinContext) throws Exception { + if (!ReplUtils.includeAcidTableInDump(withinContext.hiveConf)) { + return; + } LOG.info("Processing#{} ABORT_TXN message : {}", fromEventId(), eventMessageAsJSON); DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(eventMessageAsJSON); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java index be6574d562..3fb64fc444 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java @@ -32,6 +32,7 @@ boolean shouldReplicate(Context withinContext) { withinContext.replicationSpec, withinContext.db, true, + withinContext.getTablesForBootstrap(), withinContext.oldReplScope, withinContext.hiveConf ); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index b6d2d61616..42e74b37d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -67,8 +67,8 @@ public void handle(Context withinContext) throws Exception { } final Table qlMdTable = new Table(tobj); - if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, - true, withinContext.oldReplScope, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, + withinContext.getTablesForBootstrap(), withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java index 7d6599b768..50e0cd5f1f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java @@ -100,8 +100,8 @@ public void handle(Context withinContext) throws Exception { } Table qlMdTable = new Table(tableObject); - if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, - true, withinContext.oldReplScope, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, + withinContext.getTablesForBootstrap(), withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index 89bd0bfbca..3dd0d3b5c2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; @@ -30,11 +31,13 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import java.util.Set; + class AlterTableHandler extends AbstractEventHandler { private final org.apache.hadoop.hive.metastore.api.Table before; private final org.apache.hadoop.hive.metastore.api.Table after; private final boolean isTruncateOp; - private final Scenario scenario; + private Scenario scenario; private enum Scenario { ALTER { @@ -54,8 +57,15 @@ DumpType dumpType() { DumpType dumpType() { return DumpType.EVENT_TRUNCATE_TABLE; } + }, + DROP { + @Override + DumpType dumpType() { + return DumpType.EVENT_RENAME_DROP_TABLE; + } }; + abstract DumpType dumpType(); } @@ -82,14 +92,71 @@ private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Table before, } } + // return true, if event needs to be dumped, else return false. + private boolean handleForTableLevelReplication(Context withinContext) { + String oldName = before.getTableName(); + String newName = after.getTableName(); + + if (ReplUtils.tableIncludedInReplScope(withinContext.replScope, oldName)) { + // If the table is renamed after being added to the list of tables to be bootstrapped, then remove it from the + // list of tables to be bootstrapped. + boolean oldTableIsPresent = withinContext.removeFromListOfTablesForBootstrap(before.getTableName()); + + // If old table satisfies the filter, but the new table does not, then the old table should be dropped. + // This should be done, only if the old table is not in the list of tables to be bootstrapped which is a multi + // rename case. In case of multi rename, only the first rename should do the drop. + if (!ReplUtils.tableIncludedInReplScope(withinContext.replScope, newName)) { + if (oldTableIsPresent) { + // If the old table was present in the list of tables to be bootstrapped, then just ignore the event. + return false; + } else { + scenario = Scenario.DROP; + LOG.info("Table " + oldName + " will be dropped as the table is renamed to " + newName); + return true; + } + } + + // If the old table was in the list of tables to be bootstrapped which is a multi rename case, the old table + // is removed from the list of tables to be bootstrapped and new one is added. + if (oldTableIsPresent) { + withinContext.addToListOfTablesForBootstrap(newName); + return false; + } + + // If both old and new table satisfies the filter and old table is present at target, then dump the rename event. + LOG.info("both old and new table satisfies the filter"); + return true; + } else { + // if the old table does not satisfies the filter, but the new one satisfies, then the new table should be + // added to the list of tables to be bootstrapped and don't dump the event. + if (ReplUtils.tableIncludedInReplScope(withinContext.replScope, newName)) { + LOG.info("Table " + newName + " is added for bootstrap " + " during rename from " + oldName); + withinContext.addToListOfTablesForBootstrap(newName); + return false; + } + + // if both old and new table does not satisfies the filter, then don't dump the event. + LOG.info("both old and new table not satisfies the filter"); + return false; + } + } + @Override public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), eventMessageAsJSON); Table qlMdTableBefore = new Table(before); + Set bootstrapTableList; + if (Scenario.RENAME == scenario) { + // Handling for table level replication is done in handleForTableLevelReplication method. + bootstrapTableList = null; + } else { + bootstrapTableList = withinContext.getTablesForBootstrap(); + } + if (!Utils .shouldReplicate(withinContext.replicationSpec, qlMdTableBefore, - true, withinContext.oldReplScope, withinContext.hiveConf)) { + true, bootstrapTableList, withinContext.oldReplScope, withinContext.hiveConf)) { return; } @@ -101,6 +168,14 @@ public void handle(Context withinContext) throws Exception { } } + // If the tables are filtered based on name, then needs to handle the rename scenarios. + if (!withinContext.replScope.includeAllTables()) { + if (!handleForTableLevelReplication(withinContext)) { + LOG.info("Alter event for table " + before.getTableName() + " is skipped from dumping"); + return; + } + } + if (Scenario.ALTER == scenario) { withinContext.replicationSpec.setIsMetadataOnly(true); Table qlMdTableAfter = new Table(after); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index a13fe38140..2391b9b50d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -125,6 +125,9 @@ private void createDumpFileForTable(Context withinContext, org.apache.hadoop.hiv @Override public void handle(Context withinContext) throws Exception { + if (!ReplUtils.includeAcidTableInDump(withinContext.hiveConf)) { + return; + } LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), eventMessageAsJSON); String payload = eventMessageAsJSON; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index be7f7ebdd0..837d51c8c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -54,8 +54,8 @@ public void handle(Context withinContext) throws Exception { Table qlMdTable = new Table(tobj); - if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, - true, withinContext.oldReplScope, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, + withinContext.getTablesForBootstrap(), withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java index f8a9ace884..20cf596359 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java @@ -37,6 +37,13 @@ DropTableMessage eventMessage(String stringRepresentation) { @Override public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), eventMessageAsJSON); + + // If table is present in the list of tables to be bootstrapped, then remove it. Drop event can be ignored as + // table will not be present at target. Anyways all the events related to this table is ignored. + if (withinContext.removeFromListOfTablesForBootstrap(event.getTableName())) { + LOG.info("Table " + event.getTableName() + " is removed from list of tables to be bootstrapped."); + return; + } DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(eventMessageAsJSON); dmd.write(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java index b75d12b357..7d00f89a5b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import java.util.Set; public interface EventHandler { void handle(Context withinContext) throws Exception; @@ -42,9 +43,10 @@ final ReplicationSpec replicationSpec; final ReplScope replScope; final ReplScope oldReplScope; + private Set tablesForBootstrap; - public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, - ReplicationSpec replicationSpec, ReplScope replScope, ReplScope oldReplScope) { + public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, ReplicationSpec replicationSpec, + ReplScope replScope, ReplScope oldReplScope, Set tablesForBootstrap) { this.eventRoot = eventRoot; this.cmRoot = cmRoot; this.db = db; @@ -52,6 +54,7 @@ public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, this.replicationSpec = replicationSpec; this.replScope = replScope; this.oldReplScope = oldReplScope; + this.tablesForBootstrap = tablesForBootstrap; } public Context(Context other) { @@ -62,6 +65,7 @@ public Context(Context other) { this.replicationSpec = other.replicationSpec; this.replScope = other.replScope; this.oldReplScope = other.oldReplScope; + this.tablesForBootstrap = other.tablesForBootstrap; } void setEventRoot(Path eventRoot) { @@ -77,5 +81,19 @@ DumpMetaData createDmd(EventHandler eventHandler) { cmRoot, hiveConf ); } + + Set getTablesForBootstrap() { + return tablesForBootstrap; + } + + void addToListOfTablesForBootstrap(String tableName) { + assert tableName != null; + tablesForBootstrap.add(tableName.toLowerCase()); + } + + boolean removeFromListOfTablesForBootstrap(String tableName) { + assert tableName != null; + return tablesForBootstrap.remove(tableName.toLowerCase()); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 116db5bb69..5a18d573cf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -58,8 +58,8 @@ public void handle(Context withinContext) throws Exception { withinContext.replicationSpec.setNoop(true); } - if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, - true, withinContext.oldReplScope, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, + withinContext.getTablesForBootstrap(), withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java index 215e7261ff..f06b41083e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; @@ -35,6 +36,9 @@ OpenTxnMessage eventMessage(String stringRepresentation) { @Override public void handle(Context withinContext) throws Exception { + if (!ReplUtils.includeAcidTableInDump(withinContext.hiveConf)) { + return; + } LOG.info("Processing#{} OPEN_TXN message : {}", fromEventId(), eventMessageAsJSON); DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(eventMessageAsJSON); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java index 23d088d744..432dd4452f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java @@ -52,8 +52,8 @@ public void handle(Context withinContext) throws Exception { return; } - if (!Utils.shouldReplicate(withinContext.replicationSpec, new Table(tableObj), - true, withinContext.oldReplScope, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, new Table(tableObj), true, + withinContext.getTablesForBootstrap(), withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java index ab314edef8..75ee41f636 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java @@ -37,8 +37,8 @@ UpdateTableColumnStatMessage eventMessage(String stringRepresentation) { public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} UpdateTableColumnStat message : {}", fromEventId(), eventMessageAsJSON); Table qlMdTable = new Table(eventMessage.getTableObject()); - if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, - true, withinContext.oldReplScope, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, + withinContext.getTablesForBootstrap(), withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index bef4780eb0..898b839355 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -52,7 +52,8 @@ public TableSerializer(org.apache.hadoop.hive.ql.metadata.Table tableHandle, @Override public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws SemanticException, IOException { - if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle, false, null, hiveConf)) { + if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle, + false, null, null, hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java index d227f3d0d0..4a07473550 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.hive.ql.parse.repl.load.message; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; import org.apache.hadoop.hive.ql.ddl.DDLWork; import org.apache.hadoop.hive.ql.ddl.table.creation.DropTableDesc; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; import java.io.Serializable; import java.util.Collections; @@ -32,9 +34,18 @@ @Override public List> handle(Context context) throws SemanticException { - DropTableMessage msg = deserializer.getDropTableMessage(context.dmd.getPayload()); - String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; - String actualTblName = msg.getTable(); + String actualDbName; + String actualTblName; + if (context.dmd.getDumpType() == DumpType.EVENT_RENAME_DROP_TABLE) { + AlterTableMessage msg = deserializer.getAlterTableMessage(context.dmd.getPayload()); + actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; + actualTblName = msg.getTable(); + } else { + DropTableMessage msg = deserializer.getDropTableMessage(context.dmd.getPayload()); + actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; + actualTblName = msg.getTable(); + } + DropTableDesc dropTableDesc = new DropTableDesc( actualDbName + "." + actualTblName, null, true, true, context.eventOnlyReplicationSpec(), false diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java index e6a3e5cd7e..431413120c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; /** * Utility function that constructs a notification filter to check if table is accepted for replication. @@ -34,8 +35,12 @@ public ReplEventFilter(final ReplScope replScope) { boolean shouldAccept(final NotificationEvent event) { // All txn related events are global ones and should be always accepted. // For other events, if the DB/table names are included as per replication scope, then should - // accept the event. + // accept the event. For alter table with table name filter, bootstrap of the table will be done if the new table + // name matches the filter but the old table name does not. This can be judge only after deserialize of the message. return (isTxnRelatedEvent(event) - || replScope.includedInReplScope(event.getDbName(), event.getTableName())); + || replScope.includedInReplScope(event.getDbName(), event.getTableName()) + || (replScope.dbIncludedInReplScope(event.getDbName()) + && event.getEventType().equals(MessageBuilder.ALTER_TABLE_EVENT)) + ); } }