diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index f67fc817d1..308d578e5d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -138,7 +138,7 @@ private String replicateAndVerify(String replPolicy, String oldReplPolicy, Strin } WarehouseInstance.Tuple tuple = primary.dump(replPolicy, oldReplPolicy, lastReplId, dumpWithClause); - if (oldReplPolicy != null) { + if (bootstrappedTables != null) { verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, bootstrappedTables); } @@ -585,4 +585,143 @@ public void testReplacePolicyOnBootstrapExternalTablesIncrementalPhase() throws .run("show tables") .verifyResults(incrementalReplicatedTables); } + + @Test + public void testRenameTableScenariosBasic() throws Throwable { + String replPolicy = primaryDbName + ".['in[0-9]+'].['out[0-9]+']"; + String lastReplId = replicateAndVerify(replPolicy, null, null, null, + null, new String[]{}, new String[]{}); + + String[] originalNonAcidTables = new String[]{"in1", "in2", "out3", "out4", "out5", "out6"}; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + + // Replicate and verify if only 2 tables are replicated to target. + String[] replicatedTables = new String[]{"in1", "in2"}; + String[] bootstrapTables = new String[]{}; + lastReplId = replicateAndVerify(replPolicy, null, lastReplId, null, + null, bootstrapTables, replicatedTables); + + // Rename tables to make them satisfy the filter. + primary.run("use " + primaryDbName) + .run("alter table out3 rename to in3") + .run("alter table out4 rename to in4") + .run("alter table out5 rename to in5"); + + replicatedTables = new String[]{"in1", "in2", "in3", "in4", "in5"}; + bootstrapTables = new String[]{"in3", "in4", "in5"}; + lastReplId = replicateAndVerify(replPolicy, null, lastReplId, null, + null, bootstrapTables, replicatedTables); + + primary.run("use " + primaryDbName) + .run("alter table in3 rename to in7") + .run("alter table in7 rename to in8") // Double rename, both satisfying the filter, so no bootstrap. + .run("alter table in4 rename to out9") // out9 does not match the filter so in4 should be dropped. + .run("alter table in5 rename to out10") // Rename from satisfying name to not satisfying name. + .run("alter table out10 rename to in11");// from non satisfying to satisfying, should be bootstrapped + + replicatedTables = new String[]{"in1", "in2", "in8", "in11"}; + bootstrapTables = new String[]{"in11"}; + lastReplId = replicateAndVerify(replPolicy, null, lastReplId, null, + null, bootstrapTables, replicatedTables); + + primary.run("use " + primaryDbName) + .run("alter table in8 rename to in12") // table is renamed from satisfying to satisfying, no bootstrap + .run("alter table out9 rename to in13") // out9 does not match the filter so in13 should be bootstrapped. + .run("alter table in13 rename to in14") // table is renamed from satisfying to satisfying + .run("drop table in14"); // table in14 is dropped, so no bootstrap should happen. + + replicatedTables = new String[]{"in1", "in2", "in12", "in12"}; + bootstrapTables = new String[]{}; + replicateAndVerify(replPolicy, null, lastReplId, null, + null, bootstrapTables, replicatedTables); + } + + @Test + public void testRenameTableScenariosAcidTable() throws Throwable { + String replPolicy = primaryDbName + ".['in[0-9]+'].['out[0-9]+']"; + List dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='false'", + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'"); + String lastReplId = replicateAndVerify(replPolicy, null, null, dumpWithClause, + null, new String[]{}, new String[]{}); + + String[] originalNonAcidTables = new String[]{"in1", "out4"}; + String[] originalFullAcidTables = new String[]{"in2", "out5"}; + String[] originalMMAcidTables = new String[]{"out3", "out6"}; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + createTables(originalFullAcidTables, CreateTableType.FULL_ACID); + createTables(originalMMAcidTables, CreateTableType.MM_ACID); + + // Replicate and verify if only 1 tables are replicated to target. Acid tables are not dumped. + String[] replicatedTables = new String[]{"in1"}; + String[] bootstrapTables = new String[]{}; + lastReplId = replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, + null, bootstrapTables, replicatedTables); + + // Rename tables to make them satisfy the filter and enable acid tables. + primary.run("use " + primaryDbName) + .run("alter table out3 rename to in3") + .run("alter table out4 rename to in4") + .run("alter table out5 rename to in5"); + + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'", + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'"); + replicatedTables = new String[]{"in1", "in2", "in3", "in4", "in5"}; + bootstrapTables = new String[]{"in2", "in3", "in4", "in5"}; + replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, + null, bootstrapTables, replicatedTables); + } + + @Test + public void testRenameTableScenariosExternalTable() throws Throwable { + String replPolicy = primaryDbName + ".['in[0-9]+'].['out[0-9]+']"; + List loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + List dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='false'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='false'", + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'"); + String lastReplId = replicateAndVerify(replPolicy, null, null, dumpWithClause, + loadWithClause, new String[]{}, new String[]{}); + + String[] originalNonAcidTables = new String[]{"in1", "out4"}; + String[] originalExternalTables = new String[]{"in2", "out5"}; + String[] originalMMAcidTables = new String[]{"in3", "out6"}; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + createTables(originalExternalTables, CreateTableType.EXTERNAL); + createTables(originalMMAcidTables, CreateTableType.MM_ACID); + + // Replicate and verify if only 1 tables are replicated to target. Acid and external tables are not dumped. + String[] replicatedTables = new String[]{"in1"}; + String[] bootstrapTables = new String[]{}; + lastReplId = replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, + loadWithClause, bootstrapTables, replicatedTables); + + // Rename tables to make them satisfy the filter and enable acid and external tables. + primary.run("use " + primaryDbName) + .run("alter table out4 rename to in4") + .run("alter table out5 rename to in5"); + + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'", + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'"); + replicatedTables = new String[]{"in1", "in2", "in3", "in4", "in5"}; + bootstrapTables = new String[]{"in2", "in3", "in4", "in5"}; + lastReplId = replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, + loadWithClause, null, replicatedTables); + + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'"); + + primary.run("use " + primaryDbName) + .run("alter table out6 rename to in6") // external table bootstrap. + .run("alter table in5 rename to out7") // in5 should be deleted. + .run("alter table out7 rename to in7") // MM table bootstrap. + .run("alter table in1 rename to out10") // in1 should be deleted. + .run("alter table out10 rename to in11");// normal table bootstrapped + + replicatedTables = new String[]{"in2", "in3", "in4", "in11", "in6", "in7"}; + bootstrapTables = new String[]{"in11", "in6", "in7"}; + replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, + loadWithClause, bootstrapTables, replicatedTables); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 000d663ae9..64f263e542 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -75,6 +75,7 @@ import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.HashSet; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; @@ -82,6 +83,7 @@ private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; private static final String FUNCTION_METADATA_FILE_NAME = EximUtil.METADATA_NAME; private static final long SLEEP_TIME = 60000; + HashSet tablesForBootstrap = new HashSet<>(); public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk", "f_"); private final String name; @@ -145,10 +147,12 @@ private void prepareReturnValues(List values) throws SemanticException { * those tables as a whole. * 3. If replication policy is changed/replaced, then need to examine all the tables to see if * any of them need to be bootstrapped as old policy doesn't include it but new one does. + * 4. Some tables are renamed and the new name satisfies the table list filter while old name was not. * @return true if need to examine tables for dump and false if not. */ private boolean shouldExamineTablesToDump() { return (work.oldReplScope != null) + || !tablesForBootstrap.isEmpty() || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES) || conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES); } @@ -201,6 +205,12 @@ private boolean shouldBootstrapDumpTable(Table table) { return true; } + // If the table is renamed and the new name satisfies the filter but the old name does not then the table needs to + // be bootstrapped. + if (tablesForBootstrap.contains(table.getTableName())) { + return true; + } + // If replication policy is changed with new included/excluded tables list, then tables which // are not included in old policy but included in new policy should be bootstrapped along with // the current incremental replication dump. @@ -320,10 +330,18 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive } private boolean needBootstrapAcidTablesDuringIncrementalDump() { - // If old replication policy is available, then it is possible some of the ACID tables might be - // included for bootstrap during incremental dump. - return (ReplUtils.includeAcidTableInDump(conf) - && ((work.oldReplScope != null) || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES))); + // If acid table dump is not enabled, then no neeed to check further. + if (!ReplUtils.includeAcidTableInDump(conf)) { + return false; + } + + // If old table level policy is available or the policy has filter based on table name then it is possible that some + // of the ACID tables might be included for bootstrap during incremental dump. For old policy, its because the table + // may not satisfying the old policy but satisfying the new policy. For filter, it may happen that the table + // is renamed and started satisfying the policy. + return ((!work.replScope.includeAllTables()) + || (work.oldReplScope != null) + || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)); } private Path getBootstrapDbRoot(Path dumpRoot, String dbName, boolean isIncrementalPhase) { @@ -341,7 +359,8 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db) conf, getNewEventOnlyReplicationSpec(ev.getEventId()), work.replScope, - work.oldReplScope + work.oldReplScope, + tablesForBootstrap ); EventHandler eventHandler = EventHandlerFactory.handlerFor(ev); eventHandler.handle(context); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index e95fe1f732..c1ebfd5ab0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -497,9 +497,6 @@ private int executeIncrementalLoad(DriverContext driverContext) { // If incremental events are already applied, then check and perform if need to bootstrap any tables. if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) { - // No need to set incremental load pending flag for external tables as the files will be copied to the same path - // for external table unlike migrated txn tables. Currently bootstrap during incremental is done only for - // external tables. if (work.hasBootstrapLoadTasks()) { LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap " + "mode after applying all events."); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java index 4ec1bac6ee..7be415bb6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java @@ -96,6 +96,12 @@ public MessageHandler handler() { return new RenameTableHandler(); } }, + EVENT_RENAME_DROP_TABLE("EVENT_RENAME_DROP_TABLE") { + @Override + public MessageHandler handler() { + return new DropTableHandler(); + } + }, EVENT_TRUNCATE_TABLE("EVENT_TRUNCATE_TABLE") { @Override public MessageHandler handler() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 4500fb63e6..56850417dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -169,7 +169,8 @@ private void writeData(PartitionIterable partitions) throws SemanticException { } private boolean shouldExport() { - return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle, false, null, conf); + return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle, + false, null, null, conf); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 11a4d6271e..fb128c5554 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -47,6 +47,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.UUID; @@ -176,8 +177,8 @@ public static boolean isBootstrapDumpInProgress(Hive hiveDb, String dbName) thro * validates if a table can be exported, similar to EximUtil.shouldExport with few replication * specific checks. */ - public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table tableHandle, - boolean isEventDump, ReplScope oldReplScope, HiveConf hiveConf) { + public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table tableHandle, boolean isEventDump, + HashSet bootstrapTableList, ReplScope oldReplScope, HiveConf hiveConf) { if (replicationSpec == null) { replicationSpec = new ReplicationSpec(); } @@ -214,21 +215,26 @@ public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table tab return false; } - // Skip dumping events related to ACID tables if bootstrap is enabled on it. - // Also, skip if current table is included only in new policy but not in old policy. - if (isEventDump) { - return !hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES) - && ReplUtils.tableIncludedInReplScope(oldReplScope, tableHandle.getTableName()); + // Skip dumping events related to ACID tables if bootstrap is enabled for ACID tables. + if (isEventDump && hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) { + return false; } } - // If replication policy is replaced with new included/excluded tables list, then events - // corresponding to tables which are not included in old policy but included in new policy - // should be skipped. Those tables would be bootstrapped along with the current incremental - // replication dump. - // Note: If any event dump reaches here, it means, table is included in new replication policy. - if (isEventDump && !ReplUtils.tableIncludedInReplScope(oldReplScope, tableHandle.getTableName())) { - return false; + // Tables which are selected for bootstrap should be skipped. Those tables would be bootstrapped + // along with the current incremental replication dump and thus no need to dump events for them. + // Note: If any event (other than alter table with table level replication) dump reaches here, it means, table is + // included in new replication policy. + if (isEventDump) { + // If replication policy is replaced with new included/excluded tables list, then events + // corresponding to tables which are not included in old policy but included in new policy + // should be skipped. + if (!ReplUtils.tableIncludedInReplScope(oldReplScope, tableHandle.getTableName())) { + return false; + } + + // Tables in the list of tables to be bootstrapped should be skipped. + return (!bootstrapTableList.contains(tableHandle.getTableName())); } } return true; @@ -236,7 +242,8 @@ public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table tab public static boolean shouldReplicate(NotificationEvent tableForEvent, ReplicationSpec replicationSpec, Hive db, - boolean isEventDump, ReplScope oldReplScope, + boolean isEventDump, HashSet bootstrapTableList, + ReplScope oldReplScope, HiveConf hiveConf) { Table table; try { @@ -247,7 +254,7 @@ public static boolean shouldReplicate(NotificationEvent tableForEvent, .getTableName(), e); return false; } - return shouldReplicate(replicationSpec, table, isEventDump, oldReplScope, hiveConf); + return shouldReplicate(replicationSpec, table, isEventDump, bootstrapTableList, oldReplScope, hiveConf); } static List getDataPathList(Path fromPath, ReplicationSpec replicationSpec, HiveConf conf) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java index 5db3f26262..e1b184df94 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; @@ -35,6 +36,9 @@ AbortTxnMessage eventMessage(String stringRepresentation) { @Override public void handle(Context withinContext) throws Exception { + if (!ReplUtils.includeAcidTableInDump(withinContext.hiveConf)) { + return; + } LOG.info("Processing#{} ABORT_TXN message : {}", fromEventId(), eventMessageAsJSON); DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(eventMessageAsJSON); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java index be6574d562..a9dc006cee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java @@ -32,6 +32,7 @@ boolean shouldReplicate(Context withinContext) { withinContext.replicationSpec, withinContext.db, true, + withinContext.tablesForBootstrap, withinContext.oldReplScope, withinContext.hiveConf ); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index b6d2d61616..164a66fa7e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -68,7 +68,7 @@ public void handle(Context withinContext) throws Exception { final Table qlMdTable = new Table(tobj); if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, - true, withinContext.oldReplScope, withinContext.hiveConf)) { + true, withinContext.tablesForBootstrap, withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java index 7d6599b768..8a677e13ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java @@ -101,7 +101,7 @@ public void handle(Context withinContext) throws Exception { Table qlMdTable = new Table(tableObject); if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, - true, withinContext.oldReplScope, withinContext.hiveConf)) { + true, withinContext.tablesForBootstrap, withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index 89bd0bfbca..fc6978404f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; @@ -34,7 +35,7 @@ private final org.apache.hadoop.hive.metastore.api.Table before; private final org.apache.hadoop.hive.metastore.api.Table after; private final boolean isTruncateOp; - private final Scenario scenario; + private Scenario scenario; private enum Scenario { ALTER { @@ -54,8 +55,15 @@ DumpType dumpType() { DumpType dumpType() { return DumpType.EVENT_TRUNCATE_TABLE; } + }, + DROP { + @Override + DumpType dumpType() { + return DumpType.EVENT_RENAME_DROP_TABLE; + } }; + abstract DumpType dumpType(); } @@ -82,14 +90,49 @@ private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Table before, } } + // return true, if event needs to be dumped, else return false. + private boolean handleRenameDuringTableLevelReplication(Context withinContext) { + String oldName = before.getTableName(); + String newName = after.getTableName(); + + if (ReplUtils.tableIncludedInReplScope(withinContext.replScope, oldName)) { + // If old table satisfies the filter, but the new table does not, then the old table should be dropped. + if (!ReplUtils.tableIncludedInReplScope(withinContext.replScope, newName)) { + scenario = Scenario.DROP; + LOG.info("Table " + oldName + " will be dropped as the table is renamed to " + newName); + return true; + } + + // If both old and new table satisfies the filter, then dump the rename event. + return true; + } else { + // if the old table does not satisfies the filter, but the new one satisfies, then the new table should be + // added to the list of tables to be bootstrapped and don't dump the event. + if (ReplUtils.tableIncludedInReplScope(withinContext.replScope, newName)) { + LOG.info("Table " + newName + " is added for bootstrap " + " during rename from " + oldName); + withinContext.tablesForBootstrap.add(newName); + return false; + } + + // if both old and new table does not satisfies the filter, then don't dump the event. + return false; + } + } + @Override public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), eventMessageAsJSON); Table qlMdTableBefore = new Table(before); + if (Scenario.RENAME == scenario) { + // If the table is renamed after being added to the list of tables to be bootstrapped, then remove it from the + // list of tables to bne bootstrapped. + withinContext.tablesForBootstrap.remove(before.getTableName()); + } + if (!Utils .shouldReplicate(withinContext.replicationSpec, qlMdTableBefore, - true, withinContext.oldReplScope, withinContext.hiveConf)) { + true, withinContext.tablesForBootstrap, withinContext.oldReplScope, withinContext.hiveConf)) { return; } @@ -101,6 +144,14 @@ public void handle(Context withinContext) throws Exception { } } + // If the tables are filtered based on name, then needs to handle the rename scenarios. + if (withinContext.replScope != null && !withinContext.replScope.includeAllTables()) { + if (!handleRenameDuringTableLevelReplication(withinContext)) { + LOG.info("Alter event for table " + before.getTableName() + " is skipped from dumping"); + return; + } + } + if (Scenario.ALTER == scenario) { withinContext.replicationSpec.setIsMetadataOnly(true); Table qlMdTableAfter = new Table(after); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index a13fe38140..2391b9b50d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -125,6 +125,9 @@ private void createDumpFileForTable(Context withinContext, org.apache.hadoop.hiv @Override public void handle(Context withinContext) throws Exception { + if (!ReplUtils.includeAcidTableInDump(withinContext.hiveConf)) { + return; + } LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), eventMessageAsJSON); String payload = eventMessageAsJSON; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index be7f7ebdd0..64a8d6290d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -55,7 +55,7 @@ public void handle(Context withinContext) throws Exception { Table qlMdTable = new Table(tobj); if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, - true, withinContext.oldReplScope, withinContext.hiveConf)) { + true, withinContext.tablesForBootstrap, withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java index f8a9ace884..75a65db33f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java @@ -37,6 +37,13 @@ DropTableMessage eventMessage(String stringRepresentation) { @Override public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), eventMessageAsJSON); + + // If table is present in the list of tables to be bootstrapped, then remove it. Drop event can be ignored as + // table will not be present at target. Anyways all the events related to this table is ignored. + if (withinContext.tablesForBootstrap.remove(event.getTableName())) { + LOG.info("Table " + event.getTableName() + " is removed from list of tables to be bootstrapped."); + return; + } DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(eventMessageAsJSON); dmd.write(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java index b75d12b357..f952161a2a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import java.util.HashSet; public interface EventHandler { void handle(Context withinContext) throws Exception; @@ -42,9 +43,10 @@ final ReplicationSpec replicationSpec; final ReplScope replScope; final ReplScope oldReplScope; + HashSet tablesForBootstrap; - public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, - ReplicationSpec replicationSpec, ReplScope replScope, ReplScope oldReplScope) { + public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, ReplicationSpec replicationSpec, + ReplScope replScope, ReplScope oldReplScope, HashSet tablesForBootstrap) { this.eventRoot = eventRoot; this.cmRoot = cmRoot; this.db = db; @@ -52,6 +54,7 @@ public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, this.replicationSpec = replicationSpec; this.replScope = replScope; this.oldReplScope = oldReplScope; + this.tablesForBootstrap = tablesForBootstrap; } public Context(Context other) { @@ -62,6 +65,7 @@ public Context(Context other) { this.replicationSpec = other.replicationSpec; this.replScope = other.replScope; this.oldReplScope = other.oldReplScope; + this.tablesForBootstrap = other.tablesForBootstrap; } void setEventRoot(Path eventRoot) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 116db5bb69..a40c268a54 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -59,7 +59,7 @@ public void handle(Context withinContext) throws Exception { } if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, - true, withinContext.oldReplScope, withinContext.hiveConf)) { + true, withinContext.tablesForBootstrap, withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java index 215e7261ff..f06b41083e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; @@ -35,6 +36,9 @@ OpenTxnMessage eventMessage(String stringRepresentation) { @Override public void handle(Context withinContext) throws Exception { + if (!ReplUtils.includeAcidTableInDump(withinContext.hiveConf)) { + return; + } LOG.info("Processing#{} OPEN_TXN message : {}", fromEventId(), eventMessageAsJSON); DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(eventMessageAsJSON); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java index 23d088d744..e4dbcb3be8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java @@ -53,7 +53,7 @@ public void handle(Context withinContext) throws Exception { } if (!Utils.shouldReplicate(withinContext.replicationSpec, new Table(tableObj), - true, withinContext.oldReplScope, withinContext.hiveConf)) { + true, withinContext.tablesForBootstrap, withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java index ab314edef8..c7848edbbc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java @@ -38,7 +38,7 @@ public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} UpdateTableColumnStat message : {}", fromEventId(), eventMessageAsJSON); Table qlMdTable = new Table(eventMessage.getTableObject()); if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, - true, withinContext.oldReplScope, withinContext.hiveConf)) { + true, withinContext.tablesForBootstrap, withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index bef4780eb0..898b839355 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -52,7 +52,8 @@ public TableSerializer(org.apache.hadoop.hive.ql.metadata.Table tableHandle, @Override public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws SemanticException, IOException { - if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle, false, null, hiveConf)) { + if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle, + false, null, null, hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java index f66a40833f..7d9d3bd3c7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.hive.ql.parse.repl.load.message; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; import org.apache.hadoop.hive.ql.ddl.DDLWork2; import org.apache.hadoop.hive.ql.ddl.table.creation.DropTableDesc; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; import java.io.Serializable; import java.util.Collections; @@ -32,9 +34,18 @@ @Override public List> handle(Context context) throws SemanticException { - DropTableMessage msg = deserializer.getDropTableMessage(context.dmd.getPayload()); - String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; - String actualTblName = msg.getTable(); + String actualDbName; + String actualTblName; + if (context.dmd.getDumpType() == DumpType.EVENT_DROP_TABLE) { + AlterTableMessage msg = deserializer.getAlterTableMessage(context.dmd.getPayload()); + actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; + actualTblName = msg.getTable(); + } else { + DropTableMessage msg = deserializer.getDropTableMessage(context.dmd.getPayload()); + actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; + actualTblName = msg.getTable(); + } + DropTableDesc dropTableDesc = new DropTableDesc( actualDbName + "." + actualTblName, null, true, true, context.eventOnlyReplicationSpec(), false diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java index 14b58a3f51..14c58f4bc1 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java @@ -53,6 +53,10 @@ public String getDbName() { return dbName; } + public Pattern getDbNamePattern() { + return dbNamePattern; + } + public void setIncludedTablePatterns(List includedTableNames) { this.includedTableNames = includedTableNames; this.includedTableNamePatterns = compilePatterns(includedTableNames); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java index e6a3e5cd7e..b4d365b9e7 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; /** * Utility function that constructs a notification filter to check if table is accepted for replication. @@ -30,12 +31,22 @@ public ReplEventFilter(final ReplScope replScope) { this.replScope = replScope; } + // Return false if all the tables are included, as bootstrap during rename is done only in case filter set for tables. + boolean isAlterDuringTableLevelReplication(final NotificationEvent event) { + if (replScope.includeAllTables() || !replScope.getDbNamePattern().matcher(event.getDbName()).matches()) { + return false; + } + return event.getEventType().equals(MessageBuilder.ALTER_TABLE_EVENT); + } + @Override boolean shouldAccept(final NotificationEvent event) { // All txn related events are global ones and should be always accepted. // For other events, if the DB/table names are included as per replication scope, then should - // accept the event. - return (isTxnRelatedEvent(event) - || replScope.includedInReplScope(event.getDbName(), event.getTableName())); + // accept the event. For alter table with table name filter, bootstrap of the table will be done if the new table + // name matches the filter but the old table name does not. This can be judge only after deserialize of the message. + return ((isTxnRelatedEvent(event) + || replScope.includedInReplScope(event.getDbName(), event.getTableName()) + || isAlterDuringTableLevelReplication(event))); } }