diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 39d876802a..0e92d7508b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -248,45 +248,18 @@ private synchronized void advanceDumpDir() { } private Tuple bootstrapLoadAndVerify(String dbName, String replDbName) throws IOException { - return incrementalLoadAndVerify(dbName, null, replDbName); + return incrementalLoadAndVerify(dbName, replDbName); } - private Tuple incrementalLoadAndVerify(String dbName, String fromReplId, String replDbName) throws IOException { - Tuple dump = replDumpDb(dbName, fromReplId, null, null); + private Tuple incrementalLoadAndVerify(String dbName, String replDbName) throws IOException { + Tuple dump = replDumpDb(dbName); loadAndVerify(replDbName, dump.dumpLocation, dump.lastReplId); return dump; } - private Tuple incrementalLoadAndVerify(String dbName, String fromReplId, String toReplId, String replDbName) - throws IOException { - Tuple dump = replDumpDb(dbName, fromReplId, toReplId, null); - loadAndVerify(replDbName, dump.dumpLocation, dump.lastReplId); - return dump; - } - - private Tuple incrementalLoadAndVerify(String dbName, String fromReplId, String toReplId, String limit, - String replDbName) throws IOException { - Tuple dump = replDumpDb(dbName, fromReplId, toReplId, limit); - loadAndVerify(replDbName, dump.dumpLocation, dump.lastReplId); - return dump; - } - - private Tuple dumpDbFromLastDump(String dbName, Tuple lastDump) throws IOException { - return replDumpDb(dbName, lastDump.lastReplId, null, null); - } - - private Tuple replDumpDb(String dbName, String fromReplID, String toReplID, String limit) throws IOException { + private Tuple replDumpDb(String dbName) throws IOException { advanceDumpDir(); String dumpCmd = "REPL DUMP " + dbName; - if (null != fromReplID) { - dumpCmd = dumpCmd + " FROM " + fromReplID; - } - if (null != toReplID) { - dumpCmd = dumpCmd + " TO " + toReplID; - } - if (null != limit) { - dumpCmd = dumpCmd + " LIMIT " + limit; - } run(dumpCmd, driver); String dumpLocation = getResult(0, 0, driver); String lastReplId = getResult(0, 1, true, driver); @@ -413,7 +386,7 @@ public void testTaskCreationOptimization() throws Throwable { run("create table " + dbName + ".t2 (place string) partitioned by (country string)", driver); run("insert into table " + dbName + ".t2 partition(country='india') values ('bangalore')", driver); - Tuple dump = replDumpDb(dbName, null, null, null); + Tuple dump = replDumpDb(dbName); //bootstrap load should not have move task Task task = getReplLoadRootTask(dbNameReplica, false, dump); @@ -423,7 +396,7 @@ public void testTaskCreationOptimization() throws Throwable { loadAndVerify(dbNameReplica, dump.dumpLocation, dump.lastReplId); run("insert into table " + dbName + ".t2 partition(country='india') values ('delhi')", driver); - dump = replDumpDb(dbName, dump.lastReplId, null, null); + dump = replDumpDb(dbName); // Partition level statistics gets updated as part of the INSERT above. So we see a partition // task corresponding to an ALTER_PARTITION event. @@ -434,7 +407,7 @@ public void testTaskCreationOptimization() throws Throwable { loadAndVerify(dbNameReplica, dump.dumpLocation, dump.lastReplId); run("insert into table " + dbName + ".t2 partition(country='us') values ('sf')", driver); - dump = replDumpDb(dbName, dump.lastReplId, null, null); + dump = replDumpDb(dbName); //no move task should be added as the operation is adding a dynamic partition task = getReplLoadRootTask(dbNameReplica, true, dump); @@ -810,7 +783,7 @@ public void run() { InjectableBehaviourObjectStore.resetGetTableBehaviour(); // reset the behaviour } - incrementalLoadAndVerify(dbName, bootstrap.lastReplId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifyIfTableNotExist(replDbName, "ptned", metaStoreClientMirror); } @@ -863,7 +836,7 @@ public void testIncrementalAdds() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2, driver); // Perform REPL-DUMP/LOAD - incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); // VERIFY tables and partitions on destination for equivalence. verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror); @@ -897,7 +870,7 @@ public void testIncrementalLoadWithVariableLengthEventId() throws IOException, T run("TRUNCATE TABLE " + dbName + ".unptned", driver); run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); - Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); + Tuple incrementalDump = replDumpDb(dbName); String incrementalDumpLocn = incrementalDump.dumpLocation; replDumpId = incrementalDump.lastReplId; @@ -1057,7 +1030,7 @@ public void testDrops() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned3", ptn_data_2, driver); // replicate the incremental drops - incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); // verify that drops were replicated. This can either be from tables or ptns // not existing, and thus, throwing a NoSuchObjectException, or returning nulls @@ -1261,7 +1234,7 @@ public void testTableAlters() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned2_rn WHERE b=2", ptn_data_2, driver); // All alters done, now we replicate them over. - incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); // Replication done, we now do the following verifications: @@ -1332,7 +1305,7 @@ public void testDatabaseAlters() throws IOException { run("ALTER DATABASE " + dbName + " SET DBPROPERTIES ('" + testKey + "' = '" + testVal + "')", driver); // All alters done, now we replicate them over. - Tuple incremental = incrementalLoadAndVerify(dbName, bootstrap.lastReplId, replDbName); + Tuple incremental = incrementalLoadAndVerify(dbName, replDbName); // Replication done, we need to check if the new property is added try { @@ -1348,7 +1321,7 @@ public void testDatabaseAlters() throws IOException { run("ALTER DATABASE " + dbName + " SET DBPROPERTIES ('" + testKey + "' = '" + newValue + "')", driver); run("ALTER DATABASE " + dbName + " SET OWNER ROLE " + newOwnerName, driver); - incremental = incrementalLoadAndVerify(dbName, incremental.lastReplId, replDbName); + incremental = incrementalLoadAndVerify(dbName, replDbName); // Replication done, we need to check if new value is set for existing property try { @@ -1375,7 +1348,6 @@ public void testIncrementalLoad() throws IOException { + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String replDumpId = bootstrapDump.lastReplId; String[] unptn_data = new String[] { "eleven", "twelve" }; String[] ptn_data_1 = new String[] { "thirteen", "fourteen", "fifteen" }; @@ -1399,8 +1371,7 @@ public void testIncrementalLoad() throws IOException { run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver); verifySetup("SELECT * from " + dbName + ".unptned_late", unptn_data, driver); - Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); - replDumpId = incrementalDump.lastReplId; + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT * from " + replDbName + ".unptned_late", unptn_data, driverMirror); @@ -1422,7 +1393,7 @@ public void testIncrementalLoad() throws IOException { + ".ptned WHERE b=2", driver); verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2, driver); - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptn_data_1, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptn_data_2, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); @@ -1437,7 +1408,6 @@ public void testIncrementalInserts() throws IOException { run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String replDumpId = bootstrapDump.lastReplId; String[] unptn_data = new String[] { "eleven", "twelve" }; @@ -1449,8 +1419,7 @@ public void testIncrementalInserts() throws IOException { run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver); verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data, driver); - Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); - replDumpId = incrementalDump.lastReplId; + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data, driverMirror); @@ -1462,7 +1431,7 @@ public void testIncrementalInserts() throws IOException { run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')", driver); verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite, driver); - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data_after_ins, driverMirror); verifyRun("SELECT a from " + replDbName + ".unptned", data_after_ovwrite, driverMirror); } @@ -1508,7 +1477,7 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event }; InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventTypeValidator); try { - incrementalLoadAndVerify(dbName, bootstrap.lastReplId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); eventTypeValidator.assertInjectionsPerformed(true,false); } finally { InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour @@ -1568,7 +1537,7 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event }; InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(insertEventRepeater); try { - incrementalLoadAndVerify(dbName, bootstrap.lastReplId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); insertEventRepeater.assertInjectionsPerformed(true,false); } finally { InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour @@ -1593,7 +1562,6 @@ public void testIncrementalInsertToPartition() throws IOException { run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String replDumpId = bootstrapDump.lastReplId; String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" }; String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" }; @@ -1609,8 +1577,7 @@ public void testIncrementalInsertToPartition() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driver); verifySetup("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driver); - Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); - replDumpId = incrementalDump.lastReplId; + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); @@ -1623,7 +1590,7 @@ public void testIncrementalInsertToPartition() throws IOException { run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=3) values('" + data_after_ovwrite[0] + "')", driver); verifySetup("SELECT a from " + dbName + ".ptned where (b=3)", data_after_ovwrite, driver); - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".ptned where (b=2)", data_after_ovwrite, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned where (b=3)", data_after_ovwrite, driverMirror); @@ -1658,7 +1625,6 @@ public void testInsertToMultiKeyPartition() throws IOException { "location", "namelist/year=1980/month=4/day=1", driver); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String replDumpId = bootstrapDump.lastReplId; verifyRun("SELECT name from " + replDbName + ".namelist where (year=1980) ORDER BY name", ptn_year_1980, driverMirror); verifyRun("SELECT name from " + replDbName + ".namelist where (day=1) ORDER BY name", ptn_day_1, driverMirror); @@ -1690,8 +1656,7 @@ public void testInsertToMultiKeyPartition() throws IOException { verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1990,month=5,day=25)", "location", "namelist/year=1990/month=5/day=25", driver); - Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); - replDumpId = incrementalDump.lastReplId; + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT name from " + replDbName + ".namelist where (year=1980) ORDER BY name", ptn_year_1980, driverMirror); verifyRun("SELECT name from " + replDbName + ".namelist where (day=1) ORDER BY name", ptn_day_1_2, driverMirror); @@ -1711,7 +1676,7 @@ public void testInsertToMultiKeyPartition() throws IOException { verifySetup("SELECT name from " + dbName + ".namelist where (year=1990 and month=5 and day=25)", data_after_ovwrite, driver); verifySetup("SELECT name from " + dbName + ".namelist ORDER BY name", ptn_data_3, driver); - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifySetup("SELECT name from " + replDbName + ".namelist where (year=1990 and month=5 and day=25)", data_after_ovwrite, driverMirror); verifySetup("SELECT name from " + replDbName + ".namelist ORDER BY name", ptn_data_3, driverMirror); } @@ -1724,7 +1689,6 @@ public void testIncrementalInsertDropUnpartitionedTable() throws IOException { run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String replDumpId = bootstrapDump.lastReplId; String[] unptn_data = new String[] { "eleven", "twelve" }; @@ -1736,8 +1700,7 @@ public void testIncrementalInsertDropUnpartitionedTable() throws IOException { verifySetup("SELECT a from " + dbName + ".unptned_tmp ORDER BY a", unptn_data, driver); // Get the last repl ID corresponding to all insert/alter/create events except DROP. - Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); - String lastDumpIdWithoutDrop = incrementalDump.lastReplId; + Tuple incrementalDump = replDumpDb(dbName); // Drop all the tables run("DROP TABLE " + dbName + ".unptned", driver); @@ -1746,15 +1709,12 @@ public void testIncrementalInsertDropUnpartitionedTable() throws IOException { verifyFail("SELECT * FROM " + dbName + ".unptned_tmp", driver); // Dump all the events except DROP - incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, lastDumpIdWithoutDrop, replDbName); - replDumpId = incrementalDump.lastReplId; // Need to find the tables and data as drop is not part of this dump verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); verifyRun("SELECT a from " + replDbName + ".unptned_tmp ORDER BY a", unptn_data, driverMirror); // Dump the drop events and check if tables are getting dropped in target as well - incrementalLoadAndVerify(dbName, replDumpId, replDbName); verifyFail("SELECT * FROM " + replDbName + ".unptned", driverMirror); verifyFail("SELECT * FROM " + replDbName + ".unptned_tmp", driverMirror); } @@ -1767,7 +1727,6 @@ public void testIncrementalInsertDropPartitionedTable() throws IOException { run("CREATE TABLE " + dbName + ".ptned(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String replDumpId = bootstrapDump.lastReplId; String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" }; String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" }; @@ -1789,8 +1748,7 @@ public void testIncrementalInsertDropPartitionedTable() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_tmp where (b=2) ORDER BY a", ptn_data_2, driver); // Get the last repl ID corresponding to all insert/alter/create events except DROP. - Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); - String lastDumpIdWithoutDrop = incrementalDump.lastReplId; + Tuple incrementalDump = replDumpDb(dbName); // Drop all the tables run("DROP TABLE " + dbName + ".ptned_tmp", driver); @@ -1799,8 +1757,7 @@ public void testIncrementalInsertDropPartitionedTable() throws IOException { verifyFail("SELECT * FROM " + dbName + ".ptned", driver); // Replicate all the events except DROP - incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, lastDumpIdWithoutDrop, replDbName); - replDumpId = incrementalDump.lastReplId; + incrementalDump = incrementalLoadAndVerify(dbName, replDbName); // Need to find the tables and data as drop is not part of this dump verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); @@ -1809,7 +1766,7 @@ public void testIncrementalInsertDropPartitionedTable() throws IOException { verifyRun("SELECT a from " + replDbName + ".ptned_tmp where (b=2) ORDER BY a", ptn_data_2, driverMirror); // Replicate the drop events and check if tables are getting dropped in target as well - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifyFail("SELECT * FROM " + replDbName + ".ptned_tmp", driverMirror); verifyFail("SELECT * FROM " + replDbName + ".ptned", driverMirror); } @@ -1822,27 +1779,24 @@ public void testInsertOverwriteOnUnpartitionedTableWithCM() throws IOException { run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String replDumpId = bootstrapDump.lastReplId; // After INSERT INTO operation, get the last Repl ID String[] unptn_data = new String[] { "thirteen" }; run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); - Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); - String insertDumpId = incrementalDump.lastReplId; + Tuple incrementalDump = replDumpDb(dbName); // Insert overwrite on unpartitioned table String[] data_after_ovwrite = new String[] { "hundred" }; run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')", driver); // Replicate only one INSERT INTO operation on the table. - incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, insertDumpId, replDbName); - replDumpId = incrementalDump.lastReplId; + incrementalDump = incrementalLoadAndVerify(dbName, replDbName); // After Load from this dump, all target tables/partitions will have initial set of data but source will have latest data. verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); // Replicate the remaining INSERT OVERWRITE operations on the table. - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); // After load, shall see the overwritten data. verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", data_after_ovwrite, driverMirror); @@ -1856,7 +1810,6 @@ public void testInsertOverwriteOnPartitionedTableWithCM() throws IOException { run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String replDumpId = bootstrapDump.lastReplId; // INSERT INTO 2 partitions and get the last repl ID String[] ptn_data_1 = new String[] { "fourteen" }; @@ -1865,8 +1818,7 @@ public void testInsertOverwriteOnPartitionedTableWithCM() throws IOException { run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')", driver); run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')", driver); - Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); - String insertDumpId = incrementalDump.lastReplId; + Tuple incrementalDump = replDumpDb(dbName); // Insert overwrite on one partition with multiple files String[] data_after_ovwrite = new String[] { "hundred" }; @@ -1874,15 +1826,14 @@ public void testInsertOverwriteOnPartitionedTableWithCM() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned where (b=2)", data_after_ovwrite, driver); // Replicate only 2 INSERT INTO operations. - incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, insertDumpId, replDbName); - replDumpId = incrementalDump.lastReplId; + incrementalDump = incrementalLoadAndVerify(dbName, replDbName); // After Load from this dump, all target tables/partitions will have initial set of data. verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); // Replicate the remaining INSERT OVERWRITE operation on the table. - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); // After load, shall see the overwritten data. verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); @@ -1907,13 +1858,13 @@ public void testDropPartitionEventWithPartitionOnTimestampColumn() throws IOExce run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=\"" + ptnVal +"\") values('" + ptn_data[0] + "')", driver); // Replicate insert event and verify - Tuple incrDump = incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); + Tuple incrDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".ptned where (b=\"" + ptnVal + "\") ORDER BY a", ptn_data, driverMirror); run("ALTER TABLE " + dbName + ".ptned DROP PARTITION(b=\"" + ptnVal + "\")", driver); // Replicate drop partition event and verify - incrementalLoadAndVerify(dbName, incrDump.lastReplId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList(ptnVal)), metaStoreClientMirror); } @@ -1936,13 +1887,13 @@ public void testWithStringPartitionSpecialChars() throws IOException { run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(p=\"" + ptnVal[1] +"\") values('" + ptn_data[1] + "')", driver); // Replicate insert event and verify - Tuple incrDump = incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); + Tuple incrDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT p from " + replDbName + ".ptned ORDER BY p desc", ptnVal, driverMirror); run("ALTER TABLE " + dbName + ".ptned DROP PARTITION(p=\"" + ptnVal[0] + "\")", driver); // Replicate drop partition event and verify - incrementalLoadAndVerify(dbName, incrDump.lastReplId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList(ptnVal[0])), metaStoreClientMirror); } @@ -1956,7 +1907,6 @@ public void testRenameTableWithCM() throws IOException { run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String replDumpId = bootstrapDump.lastReplId; String[] unptn_data = new String[] { "ten", "twenty" }; String[] ptn_data_1 = new String[] { "fifteen", "fourteen" }; @@ -1973,20 +1923,18 @@ public void testRenameTableWithCM() throws IOException { run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')", driver); // Get the last repl ID corresponding to all insert events except RENAME. - Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); - String lastDumpIdWithoutRename = incrementalDump.lastReplId; + Tuple incrementalDump = replDumpDb(dbName); run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_renamed", driver); run("ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_renamed", driver); - incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, lastDumpIdWithoutRename, replDbName); - replDumpId = incrementalDump.lastReplId; + incrementalDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifyFail("SELECT a from " + replDbName + ".unptned ORDER BY a", driverMirror); verifyFail("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", driverMirror); verifyRun("SELECT a from " + replDbName + ".unptned_renamed ORDER BY a", unptn_data, driverMirror); @@ -2002,7 +1950,6 @@ public void testRenamePartitionWithCM() throws IOException { run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String replDumpId = bootstrapDump.lastReplId; String[] empty = new String[] {}; String[] ptn_data_1 = new String[] { "fifteen", "fourteen" }; @@ -2016,18 +1963,16 @@ public void testRenamePartitionWithCM() throws IOException { run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')", driver); // Get the last repl ID corresponding to all insert events except RENAME. - Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); - String lastDumpIdWithoutRename = incrementalDump.lastReplId; + Tuple incrementalDump = replDumpDb(dbName); run("ALTER TABLE " + dbName + ".ptned PARTITION (b=2) RENAME TO PARTITION (b=10)", driver); - incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, lastDumpIdWithoutRename, replDbName); - replDumpId = incrementalDump.lastReplId; + incrementalDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned where (b=10) ORDER BY a", empty, driverMirror); - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned where (b=10) ORDER BY a", ptn_data_2, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", empty, driverMirror); @@ -2060,8 +2005,8 @@ public void testRenameTableAcrossDatabases() throws IOException { verifyFail("ALTER TABLE " + dbName1 + ".unptned RENAME TO " + dbName2 + ".unptned_renamed", driver); - incrementalLoadAndVerify(dbName1, bootstrap1.lastReplId, replDbName1); - incrementalLoadAndVerify(dbName2, bootstrap2.lastReplId, replDbName2); + incrementalLoadAndVerify(dbName1, replDbName1); + incrementalLoadAndVerify(dbName2, replDbName2); verifyIfTableNotExist(replDbName1, "unptned_renamed", metaStoreClientMirror); verifyIfTableNotExist(replDbName2, "unptned_renamed", metaStoreClientMirror); @@ -2095,8 +2040,8 @@ public void testRenamePartitionedTableAcrossDatabases() throws IOException { verifyFail("ALTER TABLE " + dbName1 + ".ptned RENAME TO " + dbName2 + ".ptned_renamed", driver); - incrementalLoadAndVerify(dbName1, bootstrap1.lastReplId, replDbName1); - incrementalLoadAndVerify(dbName2, bootstrap2.lastReplId, replDbName2); + incrementalLoadAndVerify(dbName1, replDbName1); + incrementalLoadAndVerify(dbName2, replDbName2); verifyIfTableNotExist(replDbName1, "ptned_renamed", metaStoreClientMirror); verifyIfTableNotExist(replDbName2, "ptned_renamed", metaStoreClientMirror); @@ -2145,7 +2090,6 @@ public void testViewsReplication() throws IOException { //verifySetup("SELECT a from " + dbName + ".mat_view", ptn_data_1, driver); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String replDumpId = bootstrapDump.lastReplId; // view is referring to old database, so no data verifyRun("SELECT * from " + replDbName + ".virtual_view", empty, driverMirror); @@ -2161,8 +2105,7 @@ public void testViewsReplication() throws IOException { //verifySetup("SELECT * from " + dbName + ".mat_view2", unptn_data, driver); // Perform REPL-DUMP/LOAD - Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); - replDumpId = incrementalDump.lastReplId; + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned where b=1", ptn_data_1, driverMirror); @@ -2178,8 +2121,7 @@ public void testViewsReplication() throws IOException { verifySetup("SELECT * from " + dbName + ".virtual_view_rename", unptn_data, driver); // Perform REPL-DUMP/LOAD - incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); - replDumpId = incrementalDump.lastReplId; + incrementalDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT * from " + replDbName + ".virtual_view_rename", empty, driverMirror); // Test "alter table" with schema change @@ -2187,8 +2129,7 @@ public void testViewsReplication() throws IOException { verifySetup("SHOW COLUMNS FROM " + dbName + ".virtual_view_rename", new String[] {"a", "a_"}, driver); // Perform REPL-DUMP/LOAD - incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); - replDumpId = incrementalDump.lastReplId; + incrementalDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SHOW COLUMNS FROM " + replDbName + ".virtual_view_rename", new String[] {"a", "a_"}, driverMirror); // Test "DROP VIEW" @@ -2196,7 +2137,7 @@ public void testViewsReplication() throws IOException { verifyIfTableNotExist(dbName, "virtual_view", metaStoreClient); // Perform REPL-DUMP/LOAD - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifyIfTableNotExist(replDbName, "virtual_view", metaStoreClientMirror); } @@ -2207,7 +2148,7 @@ public void testDumpLimit() throws IOException { String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); - Tuple bootstrapDump = replDumpDb(dbName, null, null, null); + Tuple bootstrapDump = replDumpDb(dbName); String replDumpId = bootstrapDump.lastReplId; String replDumpLocn = bootstrapDump.dumpLocation; @@ -2217,12 +2158,12 @@ public void testDumpLimit() throws IOException { // x events to insert, last repl ID: replDumpId+x run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); - String firstInsertLastReplId = replDumpDb(dbName, replDumpId, null, null).lastReplId; + String firstInsertLastReplId = replDumpDb(dbName).lastReplId; Integer numOfEventsIns1 = Integer.valueOf(firstInsertLastReplId) - Integer.valueOf(replDumpId); // x events to insert, last repl ID: replDumpId+2x run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')", driver); - String secondInsertLastReplId = replDumpDb(dbName, firstInsertLastReplId, null, null).lastReplId; + String secondInsertLastReplId = replDumpDb(dbName).lastReplId; Integer numOfEventsIns2 = Integer.valueOf(secondInsertLastReplId) - Integer.valueOf(firstInsertLastReplId); // x events to insert, last repl ID: replDumpId+3x @@ -2231,20 +2172,15 @@ public void testDumpLimit() throws IOException { run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror); - Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, null, numOfEventsIns1.toString(), replDbName); + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); replDumpId = incrementalDump.lastReplId; verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load1, driverMirror); - - Integer lastReplID = Integer.valueOf(replDumpId); - lastReplID += 1000; - String toReplID = String.valueOf(lastReplID); - - incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, toReplID, numOfEventsIns2.toString(), replDbName); - replDumpId = incrementalDump.lastReplId; + + incrementalDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load2, driverMirror); - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); } @@ -2278,7 +2214,6 @@ public void testExchangePartition() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2, driver); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String replDumpId = bootstrapDump.lastReplId; verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2, driverMirror); @@ -2295,8 +2230,7 @@ public void testExchangePartition() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=2)", empty, driver); verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=3)", empty, driver); - Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); - replDumpId = incrementalDump.lastReplId; + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=1 and c=1)", empty, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2, driverMirror); @@ -2313,7 +2247,7 @@ public void testExchangePartition() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=2) ORDER BY a", ptn_data_2, driver); verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=3) ORDER BY a", ptn_data_2, driver); - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=1 and c=1)", empty, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=2 and c=2)", empty, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=2 and c=3)", empty, driverMirror); @@ -2330,7 +2264,6 @@ public void testTruncateTable() throws IOException { run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String replDumpId = bootstrapDump.lastReplId; String[] unptn_data = new String[] { "eleven", "twelve" }; String[] empty = new String[] {}; @@ -2338,22 +2271,20 @@ public void testTruncateTable() throws IOException { run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')", driver); verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); - Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); - replDumpId = incrementalDump.lastReplId; + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); run("TRUNCATE TABLE " + dbName + ".unptned", driver); verifySetup("SELECT a from " + dbName + ".unptned", empty, driver); - incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); - replDumpId = incrementalDump.lastReplId; + incrementalDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".unptned", empty, driverMirror); String[] unptn_data_after_ins = new String[] { "thirteen" }; run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_after_ins[0] + "')", driver); verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins, driver); - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_after_ins, driverMirror); } @@ -2403,7 +2334,7 @@ public void testTruncatePartitionedTable() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=10)", empty, driver); verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=20)", empty, driver); - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifySetup("SELECT a from " + replDbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1, driverMirror); verifySetup("SELECT a from " + replDbName + ".ptned_1 where (b=2)", empty, driverMirror); verifySetup("SELECT a from " + replDbName + ".ptned_2 where (b=10)", empty, driverMirror); @@ -2417,7 +2348,7 @@ public void testTruncateWithCM() throws IOException { String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); - Tuple bootstrapDump = replDumpDb(dbName, null, null, null); + Tuple bootstrapDump = replDumpDb(dbName); String replDumpId = bootstrapDump.lastReplId; String replDumpLocn = bootstrapDump.dumpLocation; @@ -2428,19 +2359,19 @@ public void testTruncateWithCM() throws IOException { // x events to insert, last repl ID: replDumpId+x run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); - String firstInsertLastReplId = replDumpDb(dbName, replDumpId, null, null).lastReplId; + String firstInsertLastReplId = replDumpDb(dbName).lastReplId; Integer numOfEventsIns1 = Integer.valueOf(firstInsertLastReplId) - Integer.valueOf(replDumpId); // x events to insert, last repl ID: replDumpId+2x run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')", driver); verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); - String secondInsertLastReplId = replDumpDb(dbName, firstInsertLastReplId, null, null).lastReplId; + String secondInsertLastReplId = replDumpDb(dbName).lastReplId; Integer numOfEventsIns2 = Integer.valueOf(secondInsertLastReplId) - Integer.valueOf(firstInsertLastReplId); // y event to truncate, last repl ID: replDumpId+2x+y run("TRUNCATE TABLE " + dbName + ".unptned", driver); verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", empty, driver); - String thirdTruncLastReplId = replDumpDb(dbName, secondInsertLastReplId, null, null).lastReplId; + String thirdTruncLastReplId = replDumpDb(dbName).lastReplId; Integer numOfEventsTrunc3 = Integer.valueOf(thirdTruncLastReplId) - Integer.valueOf(secondInsertLastReplId); // x events to insert, last repl ID: replDumpId+3x+y @@ -2450,27 +2381,22 @@ public void testTruncateWithCM() throws IOException { run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror); // Dump and load only first insert (1 record) - Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, null, numOfEventsIns1.toString(), replDbName); - replDumpId = incrementalDump.lastReplId; + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1, driverMirror); // Dump and load only second insert (2 records) - Integer lastReplID = Integer.valueOf(replDumpId); - lastReplID += 1000; - String toReplID = String.valueOf(lastReplID); - incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, toReplID, numOfEventsIns2.toString(), replDbName); - replDumpId = incrementalDump.lastReplId; + incrementalDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load2, driverMirror); // Dump and load only truncate (0 records) - incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, null, numOfEventsTrunc3.toString(), replDbName); + incrementalDump = incrementalLoadAndVerify(dbName, replDbName); replDumpId = incrementalDump.lastReplId; verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty, driverMirror); // Dump and load insert after truncate (1 record) - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load1, driverMirror); } @@ -2495,46 +2421,46 @@ public void testIncrementalRepeatEventOnExistingObject() throws IOException { // INSERT EVENT to unpartitioned table run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); - Tuple replDump = dumpDbFromLastDump(dbName, bootstrapDump); + Tuple replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // INSERT EVENT to partitioned table with dynamic ADD_PARTITION run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=1) values('" + ptn_data_1[0] + "')", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // ADD_PARTITION EVENT to partitioned table run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // INSERT EVENT to partitioned table on existing partition run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=2) values('" + ptn_data_2[0] + "')", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // TRUNCATE_PARTITION EVENT on partitioned table run("TRUNCATE TABLE " + dbName + ".ptned PARTITION (b=1)", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // TRUNCATE_TABLE EVENT on unpartitioned table run("TRUNCATE TABLE " + dbName + ".unptned", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // CREATE_TABLE EVENT with multiple partitions run("CREATE TABLE " + dbName + ".unptned_tmp AS SELECT * FROM " + dbName + ".ptned", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // ADD_CONSTRAINT EVENT run("ALTER TABLE " + dbName + ".unptned_tmp ADD CONSTRAINT uk_unptned UNIQUE(a) disable", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // Replicate all the events happened so far - Tuple incrDump = incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); + Tuple incrDump = incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty, driverMirror); @@ -2576,76 +2502,76 @@ public void testIncrementalRepeatEventOnMissingObject() throws Exception { // INSERT EVENT to unpartitioned table run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); - Tuple replDump = dumpDbFromLastDump(dbName, bootstrapDump); + Tuple replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // INSERT EVENT to partitioned table with dynamic ADD_PARTITION run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // ADD_PARTITION EVENT to partitioned table run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // INSERT EVENT to partitioned table on existing partition run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // TRUNCATE_PARTITION EVENT on partitioned table run("TRUNCATE TABLE " + dbName + ".ptned PARTITION(b=1)", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // TRUNCATE_TABLE EVENT on unpartitioned table run("TRUNCATE TABLE " + dbName + ".unptned", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // CREATE_TABLE EVENT on partitioned table run("CREATE TABLE " + dbName + ".ptned_tmp (a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // INSERT EVENT to partitioned table with dynamic ADD_PARTITION run("INSERT INTO TABLE " + dbName + ".ptned_tmp partition(b=10) values('" + ptn_data_1[0] + "')", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // INSERT EVENT to partitioned table with dynamic ADD_PARTITION run("INSERT INTO TABLE " + dbName + ".ptned_tmp partition(b=20) values('" + ptn_data_2[0] + "')", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // DROP_PARTITION EVENT to partitioned table run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b=1)", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // RENAME_PARTITION EVENT to partitioned table run("ALTER TABLE " + dbName + ".ptned PARTITION (b=2) RENAME TO PARTITION (b=20)", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // RENAME_TABLE EVENT to unpartitioned table run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_new", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // ADD_CONSTRAINT EVENT run("ALTER TABLE " + dbName + ".ptned_tmp ADD CONSTRAINT uk_unptned UNIQUE(a) disable", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // DROP_TABLE EVENT to partitioned table run("DROP TABLE " + dbName + ".ptned_tmp", driver); - replDump = dumpDbFromLastDump(dbName, replDump); + replDump = replDumpDb(dbName); incrementalDumpList.add(replDump); // Replicate all the events happened so far - Tuple incrDump = incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); + Tuple incrDump = incrementalLoadAndVerify(dbName, replDbName); verifyIfTableNotExist(replDbName, "unptned", metaStoreClientMirror); verifyIfTableNotExist(replDbName, "ptned_tmp", metaStoreClientMirror); @@ -2691,7 +2617,7 @@ public void testConcatenateTable() throws IOException { verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); // Replicate all the events happened after bootstrap - incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); // migration test is failing as CONCATENATE is not working. Its not creating the merged file. if (!isMigrationTest) { @@ -2724,7 +2650,7 @@ public void testConcatenatePartitionedTable() throws IOException { run("ALTER TABLE " + dbName + ".ptned PARTITION(b=2) CONCATENATE", driver); // Replicate all the events happened so far - incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); // migration test is failing as CONCATENATE is not working. Its not creating the merged file. if (!isMigrationTest) { @@ -2765,7 +2691,7 @@ public void testIncrementalLoadFailAndRetry() throws IOException { // Replicate all the events happened so far. It should fail as the data files missing in // original path and not available in CM as well. - Tuple incrDump = replDumpDb(dbName, bootstrapDump.lastReplId, null, null); + Tuple incrDump = replDumpDb(dbName); verifyFail("REPL LOAD " + replDbName + " FROM '" + incrDump.dumpLocation + "'", driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty, driverMirror); @@ -2874,7 +2800,7 @@ public void testConstraints() throws IOException { run("CREATE TABLE " + dbName + ".tbl5(a string, b string, foreign key (a, b) references " + dbName + ".tbl4(a, b) disable novalidate)", driver); run("CREATE TABLE " + dbName + ".tbl6(a string, b string not null disable, unique (a) disable)", driver); - Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); replDumpId = incrementalDump.lastReplId; String pkName = null; @@ -2904,7 +2830,7 @@ public void testConstraints() throws IOException { run("ALTER TABLE " + dbName + ".tbl5 DROP CONSTRAINT `" + fkName + "`", driver); run("ALTER TABLE " + dbName + ".tbl6 DROP CONSTRAINT `" + nnName + "`", driver); - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); try { List pks = metaStoreClientMirror.getPrimaryKeys(new PrimaryKeysRequest(replDbName , "tbl4")); assertTrue(pks.isEmpty()); @@ -2954,7 +2880,6 @@ public void testRemoveStats() throws IOException { verifySetup("SELECT max(a) from " + dbName + ".ptned where b=1", new String[]{"8"}, driver); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String replDumpId = bootstrapDump.lastReplId; verifyRun("SELECT count(*) from " + replDbName + ".unptned", new String[]{"2"}, driverMirror); verifyRun("SELECT count(*) from " + replDbName + ".ptned", new String[]{"3"}, driverMirror); @@ -2970,7 +2895,7 @@ public void testRemoveStats() throws IOException { run("ANALYZE TABLE " + dbName + ".ptned2 partition(b) COMPUTE STATISTICS FOR COLUMNS", driver); run("ANALYZE TABLE " + dbName + ".ptned2 partition(b) COMPUTE STATISTICS", driver); - incrementalLoadAndVerify(dbName, replDumpId, replDbName); + incrementalLoadAndVerify(dbName, replDbName); verifyRun("SELECT count(*) from " + replDbName + ".unptned2", new String[]{"2"}, driverMirror); verifyRun("SELECT count(*) from " + replDbName + ".ptned2", new String[]{"3"}, driverMirror); verifyRun("SELECT max(a) from " + replDbName + ".unptned2", new String[]{"2"}, driverMirror); @@ -2993,7 +2918,7 @@ public void testDeleteStagingDir() throws IOException { verifySetup("SELECT * from " + dbName + ".unptned", unptn_data, driver); // Perform repl - String replDumpLocn = replDumpDb(dbName, null, null, null).dumpLocation; + String replDumpLocn = replDumpDb(dbName).dumpLocation; // Reset the driver driverMirror.close(); run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror); @@ -3037,7 +2962,7 @@ public void testCMConflict() throws IOException { run("INSERT INTO TABLE " + dbName + ".unptned values('ten')", driver); // Bootstrap test - Tuple bootstrapDump = replDumpDb(dbName, null, null, null); + Tuple bootstrapDump = replDumpDb(dbName); advanceDumpDir(); run("REPL DUMP " + dbName, driver); String replDumpLocn = bootstrapDump.dumpLocation; @@ -3392,7 +3317,7 @@ public void testMoveOptimizationBootstrap() throws IOException { verifyRun("SELECT fld from " + tableNamePart + " where part = 11" , new String[]{ "3" }, driver); String replDbName = dbName + "_replica"; - Tuple dump = replDumpDb(dbName, null, null, null); + Tuple dump = replDumpDb(dbName); run("REPL LOAD " + replDbName + " FROM '" + dump.dumpLocation + "' with ('hive.repl.enable.move.optimization'='true')", driverMirror); verifyRun("REPL STATUS " + replDbName, dump.lastReplId, driverMirror); @@ -3427,7 +3352,7 @@ public void testMoveOptimizationIncremental() throws IOException { run("CREATE TABLE " + dbName + ".unptned_late AS SELECT * FROM " + dbName + ".unptned", driver); verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data, driver); - Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); + Tuple incrementalDump = replDumpDb(dbName); run("REPL LOAD " + replDbName + " FROM '" + incrementalDump.dumpLocation + "' with ('hive.repl.enable.move.optimization'='true')", driverMirror); verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror); @@ -3445,7 +3370,7 @@ public void testMoveOptimizationIncremental() throws IOException { run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')", driver); verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite, driver); - incrementalDump = replDumpDb(dbName, replDumpId, null, null); + incrementalDump = replDumpDb(dbName); run("REPL LOAD " + replDbName + " FROM '" + incrementalDump.dumpLocation + "' with ('hive.repl.enable.move.optimization'='true')", driverMirror); verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror); @@ -3497,7 +3422,7 @@ private String verifyAndReturnDbReplStatus(String dbName, String prevReplDumpId, String cmd, String replDbName) throws IOException { run(cmd, driver); - String lastReplDumpId = incrementalLoadAndVerify(dbName, prevReplDumpId, replDbName).lastReplId; + String lastReplDumpId = incrementalLoadAndVerify(dbName, replDbName).lastReplId; assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(prevReplDumpId)); return lastReplDumpId; } @@ -3508,7 +3433,7 @@ private String verifyAndReturnTblReplStatus( String replDbName) throws IOException, TException { run(cmd, driver); String lastReplDumpId - = incrementalLoadAndVerify(dbName, lastDbReplDumpId, replDbName).lastReplId; + = incrementalLoadAndVerify(dbName, replDbName).lastReplId; verifyRun("REPL STATUS " + replDbName, lastReplDumpId, driverMirror); assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(lastDbReplDumpId)); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java new file mode 100644 index 0000000000..735ea6810e --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; +import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionService; +import org.apache.hadoop.hive.shims.Utils; +import org.junit.*; + +import java.io.IOException; +import java.util.*; + + +/** + * TestScheduledReplicationScenarios - test scheduled replication . + */ +public class TestScheduledReplicationScenarios extends BaseReplicationScenariosAcidTables { + + private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; + + @BeforeClass + public static void classLevelSetup() throws Exception { + Map overrides = new HashMap<>(); + overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + overrides.put(HiveConf.ConfVars.HIVE_SCHEDULED_QUERIES_EXECUTOR_IDLE_SLEEP_TIME.varname, "1s"); + overrides.put(HiveConf.ConfVars.HIVE_SCHEDULED_QUERIES_EXECUTOR_PROGRESS_REPORT_INTERVAL.varname, + "1s"); + overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true"); + internalBeforeClassSetup(overrides, TestScheduledReplicationScenarios.class); + } + + static void internalBeforeClassSetup(Map overrides, + Class clazz) throws Exception { + + conf = new HiveConf(clazz); + conf.set("dfs.client.use.datanode.hostname", "true"); + conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); + MiniDFSCluster miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + Map acidEnableConf = new HashMap() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + put("hive.support.concurrency", "true"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.in.repl.test", "true"); + }}; + + acidEnableConf.putAll(overrides); + + primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf); + replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf); + Map overridesForHiveConf1 = new HashMap() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + put("hive.support.concurrency", "false"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + put("hive.metastore.client.capability.check", "false"); + }}; + replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1); + } + + @Before + public void setup() throws Throwable { + super.setup(); + } + + @After + public void tearDown() throws Throwable { + primary.run("drop database if exists " + primaryDbName + " cascade"); + replica.run("drop database if exists " + replicatedDbName + " cascade"); + replicaNonAcid.run("drop database if exists " + replicatedDbName + " cascade"); + primary.run("drop database if exists " + primaryDbName + "_extra cascade"); + } + + @Test + public void testAcidTablesBootstrapIncr() throws Throwable { + // Bootstrap + primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)") + .run("insert into t1 values(2)"); + try (ScheduledQueryExecutionService schqS = + ScheduledQueryExecutionService.startScheduledQueryExecutorService(primary.hiveConf)) { + int next = 0; + ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); + primary.run("create scheduled query s1 every 10 minutes as repl dump " + primaryDbName); + primary.run("alter scheduled query s1 execute"); + Thread.sleep(6000); + Path dumpRoot = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR), primaryDbName); + Path currdumpRoot = new Path(dumpRoot, String.valueOf(next)); + replica.load(replicatedDbName, currdumpRoot.toString()); + replica.run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1 order by id") + .verifyResults(new String[]{"1", "2"}); + + // First incremental, after bootstrap + + primary.run("use " + primaryDbName) + .run("insert into t1 values(3)") + .run("insert into t1 values(4)"); + next++; + ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); + primary.run("alter scheduled query s1 execute"); + Thread.sleep(20000); + Path incrdumpRoot = new Path(dumpRoot, String.valueOf(next)); + replica.load(replicatedDbName, incrdumpRoot.toString()); + replica.run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1 order by id") + .verifyResults(new String[]{"1", "2", "3", "4"}) + .run("drop table t1"); + + + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } finally { + primary.run("drop scheduled query s1"); + } + } + + @Test + public void testExternalTablesBootstrapIncr() throws Throwable { + List loadWithCause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + // Bootstrap + primary.run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into t1 values(1)") + .run("insert into t1 values(2)"); + try (ScheduledQueryExecutionService schqS = + ScheduledQueryExecutionService.startScheduledQueryExecutorService(primary.hiveConf)) { + int next = 0; + ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); + primary.run("create scheduled query s1 every 10 minutes as repl dump " + primaryDbName); + primary.run("alter scheduled query s1 execute"); + Thread.sleep(6000); + Path dumpRoot = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR), primaryDbName); + Path currdumpRoot = new Path(dumpRoot, String.valueOf(next)); + replica.load(replicatedDbName, currdumpRoot.toString(), loadWithCause); + replica.run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1 order by id") + .verifyResults(new String[]{"1", "2"}); + + // First incremental, after bootstrap + + primary.run("use " + primaryDbName) + .run("insert into t1 values(3)") + .run("insert into t1 values(4)"); + next++; + ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); + primary.run("alter scheduled query s1 execute"); + Thread.sleep(20000); + Path incrdumpRoot = new Path(dumpRoot, String.valueOf(next)); + replica.load(replicatedDbName, incrdumpRoot.toString(), loadWithCause); + replica.run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1 order by id") + .verifyResults(new String[]{"1", "2", "3", "4"}) + .run("drop table t1"); + + + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } finally { + primary.run("drop scheduled query s1"); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 977abb74cc..26f2e4bae1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; @@ -75,13 +76,14 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.HashSet; import java.util.Set; +import java.util.Comparator; +import java.util.ArrayList; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; @@ -119,18 +121,21 @@ public String getName() { public int execute() { try { Hive hiveDb = getHive(); - Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), getNextDumpDir()); - DumpMetaData dmd = new DumpMetaData(dumpRoot, conf); + Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), work.dbNameOrPattern); + Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); + DumpMetaData dmd = new DumpMetaData(currentDumpPath, conf); // Initialize ReplChangeManager instance since we will require it to encode file URI. ReplChangeManager.getInstance(conf); Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); Long lastReplId; - if (work.isBootStrapDump()) { - lastReplId = bootStrapDump(dumpRoot, dmd, cmRoot, hiveDb); + if (!dumpRoot.getFileSystem(conf).exists(dumpRoot) + || dumpRoot.getFileSystem(conf).listStatus(dumpRoot).length == 0) { + lastReplId = bootStrapDump(currentDumpPath, dmd, cmRoot, hiveDb); } else { - lastReplId = incrementalDump(dumpRoot, dmd, cmRoot, hiveDb); + work.setEventFrom(getEventFromPreviousDumpMetadata(dumpRoot)); + lastReplId = incrementalDump(currentDumpPath, dmd, cmRoot, hiveDb); } - prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId))); + prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); } catch (Exception e) { LOG.error("failed", e); setException(e); @@ -139,6 +144,22 @@ public int execute() { return 0; } + private Long getEventFromPreviousDumpMetadata(Path dumpRoot) throws IOException, SemanticException { + FileStatus[] statuses = dumpRoot.getFileSystem(conf).listStatus(dumpRoot); + if (statuses.length > 0) { + //sort based on last modified. Recent one is at the top + Arrays.sort(statuses, new Comparator() { + public int compare(FileStatus f1, FileStatus f2) { + return Long.valueOf(f2.getModificationTime()).compareTo(f1.getModificationTime()); + } + }); + FileStatus recentDump = statuses[0]; + DumpMetaData dmd = new DumpMetaData(recentDump.getPath(), conf); + return dmd.getEventTo().longValue(); + } + return 0L; + } + private void prepareReturnValues(List values) throws SemanticException { LOG.debug("prepareReturnValues : " + dumpSchema); for (String s : values) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 7bae9ac66d..9b11bae6ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -32,32 +32,25 @@ final ReplScope replScope; final ReplScope oldReplScope; final String dbNameOrPattern, astRepresentationForErrorMsg, resultTempPath; - final Long eventFrom; Long eventTo; - private Integer maxEventLimit; + Long eventFrom; static String testInjectDumpDir = null; + private Integer maxEventLimit; public static void injectNextDumpDirForTest(String dumpDir) { testInjectDumpDir = dumpDir; } public ReplDumpWork(ReplScope replScope, ReplScope oldReplScope, - Long eventFrom, Long eventTo, String astRepresentationForErrorMsg, Integer maxEventLimit, + String astRepresentationForErrorMsg, String resultTempPath) { this.replScope = replScope; this.oldReplScope = oldReplScope; this.dbNameOrPattern = replScope.getDbName(); - this.eventFrom = eventFrom; - this.eventTo = eventTo; this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; - this.maxEventLimit = maxEventLimit; this.resultTempPath = resultTempPath; } - boolean isBootStrapDump() { - return eventFrom == null; - } - int maxEventLimit() throws Exception { if (eventTo < eventFrom) { throw new Exception("Invalid event ID input received in TO clause"); @@ -69,6 +62,10 @@ int maxEventLimit() throws Exception { return maxEventLimit; } + void setEventFrom(long eventId) { + eventFrom = eventId; + } + // Override any user specification that changes the last event to be dumped. void overrideLastEventToDump(Hive fromDb, long bootstrapLastId) throws Exception { // If we are bootstrapping ACID tables, we need to dump all the events upto the event id at @@ -77,7 +74,6 @@ void overrideLastEventToDump(Hive fromDb, long bootstrapLastId) throws Exception // bootstrampDump() for more details. if (bootstrapLastId > 0) { eventTo = bootstrapLastId; - maxEventLimit = null; LoggerFactory.getLogger(this.getClass()) .debug("eventTo restricted to event id : {} because of bootstrap of ACID tables", eventTo); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 810a4c5284..d2e14329ea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -71,10 +71,6 @@ private ReplScope replScope = new ReplScope(); private ReplScope oldReplScope = null; - private Long eventFrom; - private Long eventTo; - private Integer maxEventLimit; - // Base path for REPL LOAD private String path; // Added conf member to set the REPL command specific config entries without affecting the configs @@ -207,27 +203,6 @@ private void initReplDump(ASTNode ast) throws HiveException { case TOK_REPLACE: setOldReplPolicy(currNode); break; - case TOK_FROM: - // TOK_FROM subtree - Tree fromNode = currNode; - eventFrom = Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(0).getText())); - - // Skip the first, which is always required - int fromChildIdx = 1; - while (fromChildIdx < fromNode.getChildCount()) { - if (fromNode.getChild(fromChildIdx).getType() == TOK_TO) { - eventTo = Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(fromChildIdx + 1).getText())); - // Skip the next child, since we already took care of it - fromChildIdx++; - } else if (fromNode.getChild(fromChildIdx).getType() == TOK_LIMIT) { - maxEventLimit = Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(fromChildIdx + 1).getText())); - // Skip the next child, since we already took care of it - fromChildIdx++; - } - // move to the next child in FROM tree - fromChildIdx++; - } - break; default: throw new SemanticException("Unrecognized token " + currNode.getType() + " in REPL DUMP statement."); } @@ -263,10 +238,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { .get(new ReplDumpWork( replScope, oldReplScope, - eventFrom, - eventTo, ASTErrorUtils.getMsg(ErrorMsg.INVALID_PATH.getMsg(), ast), - maxEventLimit, ctx.getResFile().toUri().toString() ), conf); rootTasks.add(replDumpWorkTask); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index aacd29591d..9f90a364a1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -139,9 +139,7 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, task.initialize(queryState, null, null, null); task.setWork( - new ReplDumpWork(replScope, null, - Long.MAX_VALUE, Long.MAX_VALUE, "", - Integer.MAX_VALUE, "") + new ReplDumpWork(replScope, null, "", "") ); try {