diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index 4d47254e0c..b0c8e0d7bc 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.repl.ReplAck; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.security.UserGroupInformation; @@ -45,9 +47,10 @@ import java.io.InputStream; import java.io.InputStreamReader; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; /** * Tests Table level replication scenarios. @@ -156,7 +159,7 @@ private String replicateAndVerify(String replPolicy, String oldReplPolicy, Strin replica.run("drop database if exists " + replicatedDbName + " cascade"); } - WarehouseInstance.Tuple tuple = primary.dump(replPolicy, oldReplPolicy, dumpWithClause); + WarehouseInstance.Tuple tuple = primary.dump(replPolicy, dumpWithClause); DumpMetaData dumpMetaData = new DumpMetaData(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), conf); Assert.assertEquals(oldReplPolicy != null && !replPolicy.equals(oldReplPolicy), @@ -221,7 +224,7 @@ private String replicateAndVerifyClearDump(String replPolicy, String oldReplPoli replica.run("drop database if exists " + replicatedDbName + " cascade"); } - WarehouseInstance.Tuple tuple = primary.dump(replPolicy, oldReplPolicy, dumpWithClause); + WarehouseInstance.Tuple tuple = primary.dump(replPolicy, dumpWithClause); if (bootstrappedTables != null) { verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, bootstrappedTables); @@ -347,7 +350,8 @@ public void testBasicIncrementalWithIncludeList() throws Throwable { // Replicate and verify if only 2 tables are replicated to target. String replPolicy = primaryDbName + ".'t1|t5'"; String[] replicatedTables = new String[] {"t1", "t5"}; - replicateAndVerify(replPolicy, tupleBootstrap.lastReplicationId, null, null, replicatedTables); + replicateAndVerify(replPolicy, primaryDbName, tupleBootstrap.lastReplicationId, null, + null, null, replicatedTables); } @Test @@ -362,7 +366,14 @@ public void testBasicIncrementalWithIncludeAndExcludeList() throws Throwable { // Replicate and verify if only 3 tables are replicated to target. String replPolicy = primaryDbName + ".'(t1+)|(t2)'.'t11|t3'"; String[] replicatedTables = new String[] {"t1", "t111", "t2"}; - replicateAndVerify(replPolicy, tupleBootstrap.lastReplicationId, null, null, replicatedTables); + replicateAndVerify(replPolicy, primaryDbName, tupleBootstrap.lastReplicationId, null, + null, null, replicatedTables); + + //remove table expression. fallback to db level. + replicatedTables = new String[] {"t1", "t111", "t2", "t11", "t3"}; + String[] bootstrappedTables = new String[] {"t11", "t3"}; + replicateAndVerify(primaryDbName, replPolicy, tupleBootstrap.lastReplicationId, null, + null, bootstrappedTables, replicatedTables); } @Test @@ -394,36 +405,9 @@ public void testIncorrectTablePolicyInReplDump() throws Throwable { Assert.assertTrue(failed); } - // Test incremental replication with invalid replication policies in REPLACE clause. - String replPolicy = primaryDbName; - WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) + primary.run("use " + primaryDbName) .dump(primaryDbName); replica.load(replicatedDbName, primaryDbName); - String lastReplId = tupleBootstrap.lastReplicationId; - for (String oldReplPolicy : invalidReplPolicies) { - failed = false; - try { - replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, null, null, null, replicatedTables); - } catch (Exception ex) { - LOG.info("Got exception: {}", ex.getMessage()); - Assert.assertTrue(ex instanceof ParseException); - failed = true; - } - Assert.assertTrue(failed); - } - - // Replace with replication policy having different DB name. - String oldReplPolicy = replPolicy; - replPolicy = primaryDbName + "_dupe.'t1+'.'t1'"; - failed = false; - try { - replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, null, null, null, replicatedTables); - } catch (Exception ex) { - LOG.info("Got exception: {}", ex.getMessage()); - Assert.assertTrue(ex instanceof SemanticException); - failed = true; - } - Assert.assertTrue(failed); // Invalid pattern, include/exclude table list is empty. invalidReplPolicies = new String[] { @@ -708,7 +692,7 @@ public void testReplacePolicyOnBootstrapExternalTablesIncrementalPhase() throws dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) - .dump(replPolicy, oldReplPolicy, dumpWithClause); + .dump(replPolicy, dumpWithClause); loadWithClause = ReplicationTestUtils.includeExternalTableClause(true); String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; @@ -742,7 +726,7 @@ public void testRenameTableScenariosBasic() throws Throwable { // Replicate and verify if only 2 tables are replicated to target. String[] replicatedTables = new String[] {"in1", "in2"}; String[] bootstrapTables = new String[] {}; - lastReplId = replicateAndVerify(replPolicy, null, lastReplId, null, + lastReplId = replicateAndVerify(replPolicy, replPolicy, lastReplId, null, null, bootstrapTables, replicatedTables); // Rename tables to make them satisfy the filter. @@ -753,7 +737,7 @@ public void testRenameTableScenariosBasic() throws Throwable { replicatedTables = new String[] {"in1", "in2", "in3", "in4", "in5"}; bootstrapTables = new String[] {"in3", "in4", "in5"}; - lastReplId = replicateAndVerify(replPolicy, null, lastReplId, null, + lastReplId = replicateAndVerify(replPolicy, replPolicy, lastReplId, null, null, bootstrapTables, replicatedTables); primary.run("use " + primaryDbName) @@ -765,7 +749,7 @@ public void testRenameTableScenariosBasic() throws Throwable { replicatedTables = new String[] {"in1", "in2", "in8", "in11"}; bootstrapTables = new String[] {"in11"}; - lastReplId = replicateAndVerify(replPolicy, null, lastReplId, null, + lastReplId = replicateAndVerify(replPolicy, replPolicy, lastReplId, null, null, bootstrapTables, replicatedTables); primary.run("use " + primaryDbName) @@ -780,7 +764,7 @@ public void testRenameTableScenariosBasic() throws Throwable { replicatedTables = new String[] {"in200", "in12", "in11", "in14"}; bootstrapTables = new String[] {"in14", "in200"}; - replicateAndVerify(replPolicy, null, lastReplId, null, + replicateAndVerify(replPolicy, replPolicy, lastReplId, null, null, bootstrapTables, replicatedTables); } @@ -836,7 +820,7 @@ public void testRenameTableScenariosAcidTable() throws Throwable { // Replicate and verify if only 1 tables are replicated to target. Acid tables are not dumped. String[] replicatedTables = new String[] {"in1"}; String[] bootstrapTables = new String[] {}; - lastReplId = replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, + lastReplId = replicateAndVerify(replPolicy, replPolicy, lastReplId, dumpWithClause, null, bootstrapTables, replicatedTables); // Rename tables to make them satisfy the filter and enable acid tables. @@ -849,7 +833,7 @@ public void testRenameTableScenariosAcidTable() throws Throwable { "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'"); replicatedTables = new String[] {"in1", "in2", "in3", "in4", "in5"}; bootstrapTables = new String[] {"in2", "in3", "in4", "in5"}; - replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, + replicateAndVerify(replPolicy, replPolicy, lastReplId, dumpWithClause, null, bootstrapTables, replicatedTables); } @@ -1093,4 +1077,302 @@ public void testRenameTableScenariosUpgrade() throws Throwable { replicateAndVerify(replPolicy, newReplPolicy, lastReplId, dumpWithClause, loadWithClause, bootstrapTables, replicatedTables); } + + @Test + public void testCheckPointingDataDumpFailureSamePolicyExpression() throws Throwable { + //To force distcp copy + List dumpClause = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); + + String replPolicy = primaryDbName + ".'(t1+)|(t2)'.'t11|t3'"; + + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("CREATE TABLE t2(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(replPolicy); + + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + Path metadataPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME); + long modifiedTimeMetadata = fs.getFileStatus(metadataPath).getModificationTime(); + Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME); + Path dbDataPath = new Path(dataPath, primaryDbName.toLowerCase()); + Path tablet1Path = new Path(dbDataPath, "t1"); + Path tablet2Path = new Path(dbDataPath, "t2"); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + //Delete dump ack and t2 data, metadata should be rewritten, data should be same for t1 but rewritten for t2 + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + FileStatus[] statuses = fs.listStatus(tablet2Path); + //Delete t2 data + fs.delete(statuses[0].getPath(), true); + long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime(); + long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime(); + long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime(); + //Do another dump. It should only dump table t2. Modification time of table t1 should be same while t2 is greater + WarehouseInstance.Tuple nextDump = primary.dump(replPolicy, dumpClause); + assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertEquals(modifiedTimeTable1, fs.getFileStatus(tablet1Path).getModificationTime()); + assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime()); + assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime()); + assertTrue(modifiedTimeMetadata < fs.getFileStatus(metadataPath).getModificationTime()); + } + + @Test + public void testCheckPointingDataDumpFailureDiffPolicyExpression() throws Throwable { + //To force distcp copy + List dumpClause = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); + + String replPolicy = primaryDbName + ".'(t1+)|(t2)'.'t11|t3'"; + + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("CREATE TABLE t2(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(replPolicy); + + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + Path metadataPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME); + Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME); + Path dbDataPath = new Path(dataPath, primaryDbName.toLowerCase()); + Path tablet2Path = new Path(dbDataPath, "t2"); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + //Delete dump ack and t2 data, metadata should be rewritten, data should be same for t1 but rewritten for t2 + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + FileStatus[] statuses = fs.listStatus(tablet2Path); + //Delete t2 data + fs.delete(statuses[0].getPath(), true); + //Do another dump with expression modified. It should redo the dump + replPolicy = primaryDbName + ".'(t1+)|(t2)'.'t11|t3|t12'"; + WarehouseInstance.Tuple nextDump = primary.dump(replPolicy, dumpClause); + assertNotEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation); + dumpPath = new Path(nextDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + } + + @Test + public void testIncrementalDumpCheckpointingSameExpression() throws Throwable { + String replPolicy = primaryDbName + ".'(t1+)|(t2)'.'t11|t3'"; + + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("CREATE TABLE t2(a string) STORED AS TEXTFILE") + .dump(replPolicy); + + replica.load(replicatedDbName, replPolicy) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {}); + + + //Case 1: When the last dump finished all the events and + //only _finished_dump file at the hiveDumpRoot was about to be written when it failed. + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("insert into t1 values (1)") + .run("insert into t2 values (2)") + .dump(replPolicy); + + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(ackLastEventID)); + + fs.delete(ackFile, false); + + long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1; + long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId); + assertTrue(lastIncEventID > (firstIncEventID + 1)); + Map pathModTimeMap = new HashMap<>(); + for (long eventId=firstIncEventID; eventId<=lastIncEventID; eventId++) { + Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId)); + if (fs.exists(eventRoot)) { + for (FileStatus fileStatus: fs.listStatus(eventRoot)) { + pathModTimeMap.put(fileStatus.getPath(), fileStatus.getModificationTime()); + } + } + } + + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(replPolicy); + assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation); + assertTrue(fs.exists(ackFile)); + //check events were not rewritten. + for(Map.Entry entry :pathModTimeMap.entrySet()) { + assertEquals((long)entry.getValue(), + fs.getFileStatus(new Path(hiveDumpDir, entry.getKey())).getModificationTime()); + } + + replica.load(replicatedDbName, replPolicy) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"2"}); + + + //Case 2: When the last dump was half way through + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump3 = primary.run("use " + primaryDbName) + .run("insert into t1 values (3)") + .run("insert into t2 values (4)") + .dump(replPolicy); + + hiveDumpDir = new Path(incrementalDump3.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(ackLastEventID)); + + fs.delete(ackFile, false); + //delete last three events and test if it recovers. + long lastEventID = Long.parseLong(incrementalDump3.lastReplicationId); + Path lastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID)); + Path secondLastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID - 1)); + Path thirdLastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID - 2)); + assertTrue(fs.exists(lastEvtRoot)); + assertTrue(fs.exists(secondLastEvtRoot)); + assertTrue(fs.exists(thirdLastEvtRoot)); + + pathModTimeMap = new HashMap<>(); + for (long idx = Long.parseLong(incrementalDump2.lastReplicationId)+1; idx < (lastEventID - 2); idx++) { + Path eventRoot = new Path(hiveDumpDir, String.valueOf(idx)); + if (fs.exists(eventRoot)) { + for (FileStatus fileStatus: fs.listStatus(eventRoot)) { + pathModTimeMap.put(fileStatus.getPath(), fileStatus.getModificationTime()); + } + } + } + long lastEvtModTimeOld = fs.getFileStatus(lastEvtRoot).getModificationTime(); + long secondLastEvtModTimeOld = fs.getFileStatus(secondLastEvtRoot).getModificationTime(); + long thirdLastEvtModTimeOld = fs.getFileStatus(thirdLastEvtRoot).getModificationTime(); + + fs.delete(lastEvtRoot, true); + fs.delete(secondLastEvtRoot, true); + fs.delete(thirdLastEvtRoot, true); + org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(String.valueOf(lastEventID - 3), ackLastEventID, + primary.hiveConf); + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + + WarehouseInstance.Tuple incrementalDump4 = primary.run("use " + primaryDbName) + .dump(replPolicy); + + assertEquals(incrementalDump3.dumpLocation, incrementalDump4.dumpLocation); + + assertTrue(fs.getFileStatus(lastEvtRoot).getModificationTime() > lastEvtModTimeOld); + assertTrue(fs.getFileStatus(secondLastEvtRoot).getModificationTime() > secondLastEvtModTimeOld); + assertTrue(fs.getFileStatus(thirdLastEvtRoot).getModificationTime() > thirdLastEvtModTimeOld); + + //Check other event dump files have not been modified. + for (Map.Entry entry:pathModTimeMap.entrySet()) { + assertEquals((long)entry.getValue(), fs.getFileStatus(entry.getKey()).getModificationTime()); + } + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "3"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"2", "4"}); + } + + @Test + public void testIncrementalDumpCheckpointingDiffExpression() throws Throwable { + String replPolicy = primaryDbName + ".'(t1+)|(t2)'.'t11|t3'"; + + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("CREATE TABLE t2(a string) STORED AS TEXTFILE") + .dump(replPolicy); + + replica.load(replicatedDbName, replPolicy) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {}); + + //When the last dump was half way through and expression modified + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump3 = primary.run("use " + primaryDbName) + .run("insert into t1 values (3)") + .run("insert into t2 values (4)") + .dump(replPolicy); + + Path hiveDumpDir = new Path(incrementalDump3.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(ackLastEventID)); + + fs.delete(ackFile, false); + //delete last three events and test if it recovers. + long lastEventID = Long.parseLong(incrementalDump3.lastReplicationId); + Path lastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID)); + Path secondLastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID - 1)); + Path thirdLastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID - 2)); + assertTrue(fs.exists(lastEvtRoot)); + assertTrue(fs.exists(secondLastEvtRoot)); + assertTrue(fs.exists(thirdLastEvtRoot)); + + Map pathModTimeMap = new HashMap<>(); + for (long idx = Long.parseLong(bootstrapDump.lastReplicationId)+1; idx < (lastEventID - 2); idx++) { + Path eventRoot = new Path(hiveDumpDir, String.valueOf(idx)); + if (fs.exists(eventRoot)) { + for (FileStatus fileStatus: fs.listStatus(eventRoot)) { + pathModTimeMap.put(fileStatus.getPath(), fileStatus.getModificationTime()); + } + } + } + + fs.delete(lastEvtRoot, true); + fs.delete(secondLastEvtRoot, true); + fs.delete(thirdLastEvtRoot, true); + org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(String.valueOf(lastEventID - 3), ackLastEventID, + primary.hiveConf); + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + + replPolicy = primaryDbName + ".'(t1+)|(t2)'.'t11|t3|t13'"; + WarehouseInstance.Tuple incrementalDump4 = primary.run("use " + primaryDbName) + .dump(replPolicy); + + assertNotEquals(incrementalDump3.dumpLocation, incrementalDump4.dumpLocation); + + replica.load(replicatedDbName, replPolicy) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"3"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"4"}); + } + + } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index ed12478bdd..89e535d42a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -268,27 +268,16 @@ Tuple dump(String dbName) return dump(dbName, Collections.emptyList()); } - Tuple dump(String dbName, List withClauseOptions) + Tuple dump(String dumpExpression, List withClauseOptions) throws Throwable { String dumpCommand = - "REPL DUMP " + dbName; + "REPL DUMP " + dumpExpression; if (withClauseOptions != null && !withClauseOptions.isEmpty()) { dumpCommand += " with (" + StringUtils.join(withClauseOptions, ",") + ")"; } return dumpWithCommand(dumpCommand); } - Tuple dump(String replPolicy, String oldReplPolicy, List withClauseOptions) - throws Throwable { - String dumpCommand = - "REPL DUMP " + replPolicy - + (oldReplPolicy == null ? "" : " REPLACE " + oldReplPolicy); - if (!withClauseOptions.isEmpty()) { - dumpCommand += " with (" + StringUtils.join(withClauseOptions, ",") + ")"; - } - return dumpWithCommand(dumpCommand); - } - Tuple dumpWithCommand(String dumpCommand) throws Throwable { advanceDumpDir(); run(dumpCommand); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 7e690fce35..caafb8270e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -163,6 +163,9 @@ public int execute() { Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot); boolean isBootstrap = (previousValidHiveDumpPath == null); work.setBootstrap(isBootstrap); + if (previousValidHiveDumpPath != null) { + work.setOldReplScope(new DumpMetaData(previousValidHiveDumpPath, conf).getReplScope()); + } //If no previous dump is present or previous dump is already loaded, proceed with the dump operation. if (shouldDump(previousValidHiveDumpPath)) { Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap); @@ -410,12 +413,16 @@ private boolean shouldDump(Path previousDumpPath) throws IOException { * @return true if need to examine tables for dump and false if not. */ private boolean shouldExamineTablesToDump() { - return (work.oldReplScope != null) + return (previousReplScopeModified()) || !tablesForBootstrap.isEmpty() || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES) || conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES); } + private boolean previousReplScopeModified() { + return work.oldReplScope != null && !work.oldReplScope.equals(work.replScope); + } + /** * Decide whether to dump external tables data. If external tables are enabled for replication, * then need to dump it's data in all the incremental dumps. @@ -562,53 +569,57 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive : "?"; long estimatedNumEvents = evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo, maxEventLimit); - replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(), estimatedNumEvents, - work.eventFrom, work.eventTo, maxEventLimit); - replLogger.startLog(); - Map metricMap = new HashMap<>(); - metricMap.put(ReplUtils.MetricName.EVENTS.name(), estimatedNumEvents); - work.getMetricCollector().reportStageStart(getName(), metricMap); - long dumpedCount = resumeFrom - work.eventFrom; - if (dumpedCount > 0) { - LOG.info("Event id {} to {} are already dumped, skipping {} events", work.eventFrom, resumeFrom, dumpedCount); - } - cleanFailedEventDirIfExists(dumpRoot, resumeFrom); - while (evIter.hasNext()) { - NotificationEvent ev = evIter.next(); - lastReplId = ev.getEventId(); - if (ev.getEventId() <= resumeFrom) { - continue; + try { + replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(), estimatedNumEvents, + work.eventFrom, work.eventTo, maxEventLimit); + replLogger.startLog(); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.EVENTS.name(), estimatedNumEvents); + work.getMetricCollector().reportStageStart(getName(), metricMap); + long dumpedCount = resumeFrom - work.eventFrom; + if (dumpedCount > 0) { + LOG.info("Event id {} to {} are already dumped, skipping {} events", work.eventFrom, resumeFrom, dumpedCount); } + cleanFailedEventDirIfExists(dumpRoot, resumeFrom); + while (evIter.hasNext()) { + NotificationEvent ev = evIter.next(); + lastReplId = ev.getEventId(); + if (ev.getEventId() <= resumeFrom) { + continue; + } - //disable materialized-view replication if not configured - if(!isMaterializedViewsReplEnabled()){ - String tblName = ev.getTableName(); - if(tblName != null) { - try { - Table table = hiveDb.getTable(dbName, tblName); - if (table != null && TableType.MATERIALIZED_VIEW.equals(table.getTableType())){ - LOG.info("Attempt to dump materialized view : " + tblName); - continue; + //disable materialized-view replication if not configured + if (!isMaterializedViewsReplEnabled()) { + String tblName = ev.getTableName(); + if (tblName != null) { + try { + Table table = hiveDb.getTable(dbName, tblName); + if (table != null && TableType.MATERIALIZED_VIEW.equals(table.getTableType())) { + LOG.info("Attempt to dump materialized view : " + tblName); + continue; + } + } catch (InvalidTableException te) { + LOG.debug(te.getMessage()); } - } catch (InvalidTableException te) { - LOG.debug(te.getMessage()); } } - } - Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); - dumpEvent(ev, evRoot, dumpRoot, cmRoot, hiveDb); - Utils.writeOutput(String.valueOf(lastReplId), ackFile, conf); + Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); + dumpEvent(ev, evRoot, dumpRoot, cmRoot, hiveDb); + Utils.writeOutput(String.valueOf(lastReplId), ackFile, conf); + } + replLogger.endLog(lastReplId.toString()); + LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); + } finally { + //write the dmd always irrespective of success/failure to enable checkpointing in table level replication + long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); + dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executionId, + previousReplScopeModified()); + // If repl policy is changed (oldReplScope is set), then pass the current replication policy, + // so that REPL LOAD would drop the tables which are not included in current policy. + dmd.setReplScope(work.replScope); + dmd.write(true); } - replLogger.endLog(lastReplId.toString()); - LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); - long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); - dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executionId, - work.oldReplScope != null); - // If repl policy is changed (oldReplScope is set), then pass the current replication policy, - // so that REPL LOAD would drop the tables which are not included in current policy. - dmd.setReplScope(work.replScope); - dmd.write(true); int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE); try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize); FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, cacheSize)) { @@ -762,7 +773,7 @@ private boolean needBootstrapAcidTablesDuringIncrementalDump() { // may not satisfying the old policy but satisfying the new policy. For filter, it may happen that the table // is renamed and started satisfying the policy. return ((!work.replScope.includeAllTables()) - || (work.oldReplScope != null) + || (previousReplScopeModified()) || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)); } @@ -934,17 +945,19 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) replLogger.endLog(bootDumpBeginReplId.toString()); work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId); } - Long bootDumpEndReplId = currentNotificationId(hiveDb); + work.setFunctionCopyPathIterator(functionsBinaryCopyPaths.iterator()); + setDataCopyIterators(extTableFileList, managedTblList); LOG.info("Preparing to return {},{}->{}", - dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); + dumpRoot.toUri(), bootDumpBeginReplId, currentNotificationId(hiveDb)); + return bootDumpBeginReplId; + } finally { + //write the dmd always irrespective of success/failure to enable checkpointing in table level replication + Long bootDumpEndReplId = currentNotificationId(hiveDb); long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId, - work.oldReplScope != null); + previousReplScopeModified()); dmd.setReplScope(work.replScope); dmd.write(true); - work.setFunctionCopyPathIterator(functionsBinaryCopyPaths.iterator()); - setDataCopyIterators(extTableFileList, managedTblList); - return bootDumpBeginReplId; } } @@ -968,8 +981,12 @@ private boolean shouldResumePreviousDump(Path lastDumpPath, boolean isBootStrap) return false; } Path hiveDumpPath = new Path(lastDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); + DumpMetaData dumpMetaData = new DumpMetaData(hiveDumpPath, conf); + if (tableExpressionModified(dumpMetaData)) { + return false; + } if (isBootStrap) { - return shouldResumePreviousDump(new DumpMetaData(hiveDumpPath, conf)); + return shouldResumePreviousDump(dumpMetaData); } // In case of incremental we should resume if _events_dump file is present and is valid Path lastEventFile = new Path(hiveDumpPath, ReplAck.EVENTS_DUMP.toString()); @@ -982,6 +999,17 @@ private boolean shouldResumePreviousDump(Path lastDumpPath, boolean isBootStrap) return resumeFrom > 0L; } + private boolean tableExpressionModified(DumpMetaData dumpMetaData) { + try { + //Check if last dump was with same repl scope. If not table expression was modified. So restart the dump + //Dont use checkpointing if repl scope if modified + return !dumpMetaData.getReplScope().equals(work.replScope); + } catch (Exception e) { + LOG.info("No previous dump present"); + return false; + } + } + long currentNotificationId(Hive hiveDb) throws TException { return hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index aecfe757a4..47051e41ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.EximUtil; @@ -47,7 +46,7 @@ private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(ReplDumpWork.class); final ReplScope replScope; - final ReplScope oldReplScope; + ReplScope oldReplScope; final String dbNameOrPattern, astRepresentationForErrorMsg, resultTempPath; Long eventTo; Long eventFrom; @@ -88,16 +87,19 @@ public static void testDeletePreviousDumpMetaPath(boolean failDeleteDumpMeta) { testDeletePreviousDumpMetaPath = failDeleteDumpMeta; } - public ReplDumpWork(ReplScope replScope, ReplScope oldReplScope, + public ReplDumpWork(ReplScope replScope, String astRepresentationForErrorMsg, String resultTempPath) { this.replScope = replScope; - this.oldReplScope = oldReplScope; this.dbNameOrPattern = replScope.getDbName(); this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; this.resultTempPath = resultTempPath; } + void setOldReplScope(ReplScope replScope) { + oldReplScope = replScope; + } + int maxEventLimit() throws Exception { if (eventTo < eventFrom) { throw new Exception("Invalid event ID input received in TO clause"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index ed408b0c1c..60ada73ae9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -65,7 +65,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // Replication Scope private ReplScope replScope = new ReplScope(); - private ReplScope oldReplScope = null; // Source DB Name for REPL LOAD private String sourceDbNameOrPattern; @@ -143,34 +142,6 @@ private void setReplDumpTablesList(Tree replTablesNode, ReplScope replScope) thr } } - private void setOldReplPolicy(Tree oldReplPolicyTree) throws HiveException { - oldReplScope = new ReplScope(); - int childCount = oldReplPolicyTree.getChildCount(); - - // First child is DB name and optional second child is tables list. - assert(childCount <= 2); - - // First child is always the DB name. So set it. - oldReplScope.setDbName(oldReplPolicyTree.getChild(0).getText()); - LOG.info("Old ReplScope: Set DB Name: {}", oldReplScope.getDbName()); - if (!oldReplScope.getDbName().equalsIgnoreCase(replScope.getDbName())) { - LOG.error("DB name {} cannot be replaced to {} in the replication policy.", - oldReplScope.getDbName(), replScope.getDbName()); - throw new SemanticException("DB name cannot be replaced in the replication policy."); - } - - // If the old policy is just , then tables list won't be there. - if (childCount <= 1) { - return; - } - - // Traverse the children which can be either just include tables list or both include - // and exclude tables lists. - Tree oldPolicyTablesListNode = oldReplPolicyTree.getChild(1); - assert(oldPolicyTablesListNode.getType() == TOK_REPL_TABLES); - setReplDumpTablesList(oldPolicyTablesListNode, oldReplScope); - } - private void initReplDump(ASTNode ast) throws HiveException { int numChildren = ast.getChildCount(); boolean isMetaDataOnly = false; @@ -196,9 +167,6 @@ private void initReplDump(ASTNode ast) throws HiveException { case TOK_REPL_TABLES: setReplDumpTablesList(currNode, replScope); break; - case TOK_REPLACE: - setOldReplPolicy(currNode); - break; default: throw new SemanticException("Unrecognized token " + currNode.getType() + " in REPL DUMP statement."); } @@ -237,7 +205,6 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { Task replDumpWorkTask = TaskFactory .get(new ReplDumpWork( replScope, - oldReplScope, ASTErrorUtils.getMsg(ErrorMsg.INVALID_PATH.getMsg(), ast), ctx.getResFile().toUri().toString() ), conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index 0d5d4eaa47..d654d3d042 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -210,7 +210,7 @@ public void handle(Context withinContext) throws Exception { String oldName = before.getTableName(); String newName = after.getTableName(); boolean needDump = true; - if (withinContext.oldReplScope != null) { + if (withinContext.oldReplScope != null && !withinContext.oldReplScope.equals(withinContext.replScope)) { needDump = handleRenameForReplacePolicy(withinContext, oldName, newName); } else if (!withinContext.replScope.includeAllTables()) { needDump = handleRenameForTableLevelReplication(withinContext, oldName, newName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java index c428ea25f4..eb87bb925d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java @@ -84,6 +84,10 @@ public void setReplScope(ReplScope replScope) { this.replScope = replScope; } + public void setDumpType(DumpType dumpType) { + this.dumpType = dumpType; + } + private void readReplScope(String line) throws IOException { if (line == null) { return; @@ -121,9 +125,12 @@ private void loadDumpFromFile() throws SemanticException { String line; if ((line = br.readLine()) != null) { String[] lineContents = line.split("\t", 7); - setDump(DumpType.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), - Long.valueOf(lineContents[2]), - new Path(lineContents[3]), Long.valueOf(lineContents[4]), Boolean.valueOf(lineContents[6])); + setDump(lineContents[0].equals(Utilities.nullStringOutput) ? null : DumpType.valueOf(lineContents[0]), + lineContents[1].equals(Utilities.nullStringOutput) ? null : Long.valueOf(lineContents[1]), + lineContents[2].equals(Utilities.nullStringOutput) ? null : Long.valueOf(lineContents[2]), + lineContents[3].equals(Utilities.nullStringOutput) ? null : new Path(lineContents[3]), + lineContents[4].equals(Utilities.nullStringOutput) ? null : Long.valueOf(lineContents[4]), + Boolean.valueOf(lineContents[6])); setPayload(lineContents[5].equals(Utilities.nullStringOutput) ? null : lineContents[5]); } else { throw new IOException( @@ -222,11 +229,11 @@ public void write(boolean replace) throws SemanticException { List> listValues = new ArrayList<>(); listValues.add( Arrays.asList( - dumpType.toString(), - eventFrom.toString(), - eventTo.toString(), - cmRoot.toString(), - dumpExecutionId.toString(), + dumpType != null ? dumpType.toString() : null, + eventFrom != null ? eventFrom.toString() : null, + eventTo != null ? eventTo.toString() : null, + cmRoot != null ? cmRoot.toString() : null, + dumpExecutionId != null ? dumpExecutionId.toString() : null, payload, String.valueOf(replScopeModified)) ); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index 3f47678988..aede902d4a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -147,8 +147,7 @@ void dumpTable(String dbName, String tblName, String validTxnList, }; task.initialize(queryState, null, null, null); - ReplDumpWork replDumpWork = new ReplDumpWork(replScope, - null, "", ""); + ReplDumpWork replDumpWork = new ReplDumpWork(replScope, "", ""); replDumpWork.setMetricCollector(metricCollector); task.setWork(replDumpWork); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java index 8b26f1ce40..ccf22b9c7b 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.common.repl; import java.io.Serializable; +import java.util.Objects; import java.util.regex.Pattern; /** @@ -116,4 +117,24 @@ private boolean inTableExcludedList(final String tableName) { } return excludedTableNamePattern.matcher(tableName).matches(); } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (! (o instanceof ReplScope)) { + return false; + } + + ReplScope replScope = (ReplScope)o; + return Objects.equals(replScope.excludedTableNames, this.excludedTableNames) + && Objects.equals(replScope.includedTableNames, this.includedTableNames) + && Objects.equals(replScope.dbName, this.dbName); + } + + @Override + public int hashCode() { + return Objects.hash(dbName, dbNamePattern, includedTableNames, excludedTableNames); + } }