diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 22d5876..b59833d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -119,7 +119,7 @@ protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); private ArrayList lastResults; - private final boolean VERIFY_SETUP_STEPS = true; + private final boolean VERIFY_SETUP_STEPS = false; // if verifySetup is set to true, all the test setup we do will perform additional // verifications as well, which is useful to verify that our setup occurred // correctly when developing and debugging tests. These verifications, however @@ -220,6 +220,20 @@ private Tuple incrementalLoadAndVerify(String dbName, String fromReplId, String return dump; } + private Tuple incrementalLoadAndVerify(String dbName, String fromReplId, String toReplId, String replDbName) + throws IOException { + Tuple dump = replDumpDb(dbName, fromReplId, toReplId, null); + loadAndVerify(replDbName, dump.dumpLocation, dump.lastReplId); + return dump; + } + + private Tuple incrementalLoadAndVerify(String dbName, String fromReplId, String toReplId, String limit, + String replDbName) throws IOException { + Tuple dump = replDumpDb(dbName, fromReplId, toReplId, limit); + loadAndVerify(replDbName, dump.dumpLocation, dump.lastReplId); + return dump; + } + private Tuple dumpDbFromLastDump(String dbName, Tuple lastDump) throws IOException { return replDumpDb(dbName, lastDump.lastReplId, null, null); } @@ -300,6 +314,7 @@ public void testBasic() throws IOException { public void testBasicWithCM() throws Exception { String name = testName.getMethodName(); String dbName = createDB(name, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver); @@ -320,18 +335,13 @@ public void testBasicWithCM() throws Exception { createTestDataFile(ptn_locn_2, ptn_data_2); run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); - run("SELECT * from " + dbName + ".unptned", driver); - verifyResults(unptn_data, driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptn_data, driver); run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)", driver); - run("SELECT a from " + dbName + ".ptned WHERE b=1", driver); - verifyResults(ptn_data_1, driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1, driver); run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)", driver); - run("SELECT a from " + dbName + ".ptned WHERE b=2", driver); - verifyResults(ptn_data_2, driver); - run("SELECT a from " + dbName + ".ptned_empty", driver); - verifyResults(empty, driver); - run("SELECT * from " + dbName + ".unptned_empty", driver); - verifyResults(empty, driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2, driver); + verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driver); + verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driver); advanceDumpDir(); run("REPL DUMP " + dbName, driver); @@ -344,30 +354,20 @@ public void testBasicWithCM() throws Exception { // Partition droppped after "repl dump" run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)", driver); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); - - run("REPL STATUS " + dbName + "_dupe", driverMirror); - verifyResults(new String[] {replDumpId}, driverMirror); + run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror); + verifyRun("REPL STATUS " + replDbName, new String[] {replDumpId}, driverMirror); - run("SELECT * from " + dbName + "_dupe.unptned", driverMirror); - verifyResults(unptn_data, driverMirror); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", driverMirror); - verifyResults(ptn_data_1, driverMirror); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", driverMirror); - verifyResults(ptn_data_2, driverMirror); - run("SELECT a from " + dbName + ".ptned_empty", driverMirror); - verifyResults(empty, driverMirror); - run("SELECT * from " + dbName + ".unptned_empty", driverMirror); - verifyResults(empty, driverMirror); + verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_empty", empty, driverMirror); + verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror); } @Test public void testBootstrapLoadOnExistingDb() throws IOException { String testName = "bootstrapLoadOnExistingDb"; - LOG.info("Testing "+testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName, driver); + String dbName = createDB(testName, driver); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); String[] unptn_data = new String[]{ "eleven" , "twelve" }; @@ -380,15 +380,9 @@ public void testBootstrapLoadOnExistingDb() throws IOException { // Create an empty database to load run("CREATE DATABASE " + dbName + "_empty", driverMirror); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0,0,driver); - String replDumpId = getResult(0,1,true,driver); // Load to an empty database - run("REPL LOAD " + dbName + "_empty FROM '" + replDumpLocn + "'", driverMirror); - - // REPL STATUS should return same repl ID as dump - verifyRun("REPL STATUS " + dbName + "_empty", replDumpId, driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, dbName + "_empty"); + String replDumpLocn = bootstrapDump.dumpLocation; verifyRun("SELECT * from " + dbName + "_empty.unptned", unptn_data, driverMirror); String[] nullReplId = new String[]{ "NULL" }; @@ -417,6 +411,7 @@ public void testBootstrapLoadOnExistingDb() throws IOException { public void testBootstrapWithConcurrentDropTable() throws IOException { String name = testName.getMethodName(); String dbName = createDB(name, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); @@ -465,33 +460,33 @@ public Table apply(@Nullable Table table) { String replDumpLocn = getResult(0, 0, driver); String replDumpId = getResult(0, 1, true, driver); LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror); // The ptned table should miss in target as the table was marked virtually as dropped - verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data, driverMirror); - verifyFail("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", driverMirror); - verifyIfTableNotExist(dbName + "_dupe", "ptned", metaStoreClient); + verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror); + verifyFail("SELECT a from " + replDbName + ".ptned WHERE b=1", driverMirror); + verifyIfTableNotExist(replDbName + "", "ptned", metaStoreClient); // Verify if Drop table on a non-existing table is idempotent run("DROP TABLE " + dbName + ".ptned", driver); - verifyIfTableNotExist(dbName, "ptned", metaStoreClient); advanceDumpDir(); run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); String postDropReplDumpLocn = getResult(0,0, driver); String postDropReplDumpId = getResult(0,1,true,driver); LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId); - assert(run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'", true, driverMirror)); + assert(run("REPL LOAD " + replDbName + " FROM '" + postDropReplDumpLocn + "'", true, driverMirror)); - verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data, driverMirror); - verifyIfTableNotExist(dbName + "_dupe", "ptned", metaStoreClientMirror); - verifyFail("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", driverMirror); + verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror); + verifyIfTableNotExist(replDbName, "ptned", metaStoreClientMirror); + verifyFail("SELECT a from " + replDbName + ".ptned WHERE b=1", driverMirror); } @Test public void testBootstrapWithConcurrentDropPartition() throws IOException { String name = testName.getMethodName(); String dbName = createDB(name, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; @@ -530,33 +525,29 @@ public void testBootstrapWithConcurrentDropPartition() throws IOException { String replDumpLocn = getResult(0, 0, driver); String replDumpId = getResult(0, 1, true, driver); LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror); // All partitions should miss in target as it was marked virtually as dropped - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", empty, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", empty, driverMirror); - verifyIfPartitionNotExist(dbName + "_dupe", "ptned", new ArrayList<>(Arrays.asList("1")), metaStoreClientMirror); - verifyIfPartitionNotExist(dbName + "_dupe", "ptned", new ArrayList<>(Arrays.asList("2")), metaStoreClientMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", empty, driverMirror); + verifyIfPartitionNotExist(replDbName , "ptned", new ArrayList<>(Arrays.asList("1")), metaStoreClientMirror); + verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("2")), metaStoreClientMirror); // Verify if drop partition on a non-existing partition is idempotent and just a noop. run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b=1)", driver); run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b=2)", driver); - verifyIfPartitionNotExist(dbName, "ptned", new ArrayList<>(Arrays.asList("1")), metaStoreClient); - verifyIfPartitionNotExist(dbName, "ptned", new ArrayList<>(Arrays.asList("2")), metaStoreClient); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", empty, driver); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", empty, driver); advanceDumpDir(); run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); String postDropReplDumpLocn = getResult(0,0,driver); String postDropReplDumpId = getResult(0,1,true,driver); LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId); - assert(run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'", true, driverMirror)); + assert(run("REPL LOAD " + replDbName + " FROM '" + postDropReplDumpLocn + "'", true, driverMirror)); - verifyIfPartitionNotExist(dbName + "_dupe", "ptned", new ArrayList<>(Arrays.asList("1")), metaStoreClientMirror); - verifyIfPartitionNotExist(dbName + "_dupe", "ptned", new ArrayList<>(Arrays.asList("2")), metaStoreClientMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", empty, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", empty, driverMirror); + verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("1")), metaStoreClientMirror); + verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("2")), metaStoreClientMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", empty, driverMirror); } @Test @@ -621,7 +612,7 @@ public void run() { // The ptned table should be there in both source and target as rename was not successful verifyRun("SELECT a from " + dbName + ".ptned WHERE (b=1) ORDER BY a", ptn_data, driver); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE (b=1) ORDER BY a", ptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE (b=1) ORDER BY a", ptn_data, driverMirror); // Verify if Rename after bootstrap is successful run("ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=10)", driver); @@ -688,25 +679,20 @@ public void run() { incrementalLoadAndVerify(dbName, bootstrap.lastReplId, replDbName); verifyIfTableNotExist(replDbName, "ptned", metaStoreClientMirror); - } @Test public void testIncrementalAdds() throws IOException { String name = testName.getMethodName(); String dbName = createDB(name, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0,0,driver); - String replDumpId = getResult(0,1,true,driver); - LOG.info("Dumped to {} with id {}",replDumpLocn,replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); String[] unptn_data = new String[]{ "eleven" , "twelve" }; String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; @@ -721,8 +707,8 @@ public void testIncrementalAdds() throws IOException { createTestDataFile(ptn_locn_1, ptn_data_1); createTestDataFile(ptn_locn_2, ptn_data_2); - verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driverMirror); - verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_empty", empty, driverMirror); + verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror); // Now, we load data into the tables, and see if an incremental // repl drop/load can duplicate it. @@ -744,47 +730,32 @@ public void testIncrementalAdds() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2, driver); // Perform REPL-DUMP/LOAD - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String incrementalDumpLocn = getResult(0,0,driver); - String incrementalDumpId = getResult(0,1,true,driver); - LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'", driverMirror); - - run("REPL STATUS " + dbName + "_dupe", driverMirror); - verifyResults(new String[] {incrementalDumpId}, driverMirror); + incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); // VERIFY tables and partitions on destination for equivalence. + verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_empty", empty, driverMirror); - verifyRun("SELECT * from " + dbName + "_dupe.unptned_empty", empty, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_empty", empty, driverMirror); + verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror); + verifyRun("SELECT * from " + replDbName + ".unptned_late", unptn_data, driverMirror); -// verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data); - // TODO :this does not work because LOAD DATA LOCAL INPATH into an unptned table seems - // to use ALTER_TABLE only - it does not emit an INSERT or CREATE - re-enable after - // fixing that. - verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptn_data_2, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2, driverMirror); - - verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptn_data_2, driverMirror); } @Test public void testIncrementalLoadWithVariableLengthEventId() throws IOException, TException { String testName = "incrementalLoadWithVariableLengthEventId"; String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); run("INSERT INTO TABLE " + dbName + ".unptned values('ten')", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; // CREATE_TABLE - TRUNCATE - INSERT - The result is just one record. // Creating dummy table to control the event ID of TRUNCATE not to be 10 or 100 or 1000... @@ -873,15 +844,15 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event String incrementalDumpLocn = getResult(0, 0, driver); String incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data, driverMirror); + run("REPL LOAD " + replDbName + " FROM '" + incrementalDumpLocn + "'", driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); } @Test public void testDrops() throws IOException { - String name = testName.getMethodName(); String dbName = createDB(name, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE", driver); @@ -918,23 +889,16 @@ public void testDrops() throws IOException { // At this point, we've set up all the tables and ptns we're going to test drops across // Replicate it first, and then we'll drop it on the source. - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0,0,driver); - String replDumpId = getResult(0,1,true,driver); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); - verifySetup("REPL STATUS " + dbName + "_dupe", new String[]{replDumpId}, driverMirror); - - verifySetup("SELECT * from " + dbName + "_dupe.unptned", unptn_data, driverMirror); - verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", ptn_data_1, driverMirror); - verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", ptn_data_2, driverMirror); - verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'", ptn_data_1, driverMirror); - verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'", ptn_data_2, driverMirror); - verifySetup("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=1", ptn_data_1, driverMirror); - verifySetup("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=2", ptn_data_2, driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b='1'", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b='2'", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned2 WHERE b='1'", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned2 WHERE b='2'", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned3 WHERE b=1", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned3 WHERE b=2", ptn_data_2, driverMirror); // All tables good on destination, drop on source. - run("DROP TABLE " + dbName + ".unptned", driver); run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')", driver); run("DROP TABLE " + dbName + ".ptned2", driver); @@ -945,33 +909,27 @@ public void testDrops() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned3", ptn_data_2, driver); // replicate the incremental drops - - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String postDropReplDumpLocn = getResult(0,0,driver); - String postDropReplDumpId = getResult(0,1,true,driver); - LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'", driverMirror); + incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); // verify that drops were replicated. This can either be from tables or ptns // not existing, and thus, throwing a NoSuchObjectException, or returning nulls // or select * returning empty, depending on what we're testing. - verifyIfTableNotExist(dbName + "_dupe", "unptned", metaStoreClientMirror); + verifyIfTableNotExist(replDbName, "unptned", metaStoreClientMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", empty, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=1", empty, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned3", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b='2'", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned3 WHERE b=1", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned3", ptn_data_2, driverMirror); - verifyIfTableNotExist(dbName + "_dupe", "ptned2", metaStoreClientMirror); + verifyIfTableNotExist(replDbName, "ptned2", metaStoreClientMirror); } @Test public void testDropsWithCM() throws IOException { - String testName = "drops_with_cm"; String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE", driver); @@ -990,57 +948,35 @@ public void testDropsWithCM() throws IOException { createTestDataFile(ptn_locn_2, ptn_data_2); run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); - run("SELECT * from " + dbName + ".unptned", driver); - verifyResults(unptn_data, driver); + verifySetup("SELECT * from " + dbName + ".unptned",unptn_data, driver); run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')", driver); - run("SELECT a from " + dbName + ".ptned WHERE b='1'", driver); - verifyResults(ptn_data_1, driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b='1'", ptn_data_1, driver); run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')", driver); - run("SELECT a from " + dbName + ".ptned WHERE b='2'", driver); - verifyResults(ptn_data_2, driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", ptn_data_2, driver); run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')", driver); - run("SELECT a from " + dbName + ".ptned2 WHERE b='1'", driver); - verifyResults(ptn_data_1, driver); + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='1'", ptn_data_1, driver); run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')", driver); - run("SELECT a from " + dbName + ".ptned2 WHERE b='2'", driver); - verifyResults(ptn_data_2, driver); + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='2'", ptn_data_2, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0,0,driver); - String replDumpId = getResult(0,1,true,driver); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); - - run("REPL STATUS " + dbName + "_dupe", driverMirror); - verifyResults(new String[] {replDumpId}, driverMirror); - - run("SELECT * from " + dbName + "_dupe.unptned", driverMirror); - verifyResults(unptn_data, driverMirror); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", driverMirror); - verifyResults(ptn_data_1, driverMirror); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", driverMirror); - verifyResults(ptn_data_2, driverMirror); - run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'", driverMirror); - verifyResults(ptn_data_1, driverMirror); - run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'", driverMirror); - verifyResults(ptn_data_2, driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; + + verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b='1'", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b='2'", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned2 WHERE b='1'", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned2 WHERE b='2'", ptn_data_2, driverMirror); run("CREATE TABLE " + dbName + ".unptned_copy" + " AS SELECT a FROM " + dbName + ".unptned", driver); run("CREATE TABLE " + dbName + ".ptned_copy" + " LIKE " + dbName + ".ptned", driver); run("INSERT INTO TABLE " + dbName + ".ptned_copy" + " PARTITION(b='1') SELECT a FROM " + dbName + ".ptned WHERE b='1'", driver); - run("SELECT a from " + dbName + ".unptned_copy", driver); - verifyResults(unptn_data, driver); - run("SELECT a from " + dbName + ".ptned_copy", driver); - verifyResults(ptn_data_1, driver); + verifySetup("SELECT a from " + dbName + ".unptned_copy", unptn_data, driver); + verifySetup("SELECT a from " + dbName + ".ptned_copy", ptn_data_1, driver); run("DROP TABLE " + dbName + ".unptned", driver); run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')", driver); run("DROP TABLE " + dbName + ".ptned2", driver); - run("SELECT a from " + dbName + ".ptned WHERE b=2", driver); - verifyResults(empty, driver); - run("SELECT a from " + dbName + ".ptned", driver); - verifyResults(ptn_data_1, driver); advanceDumpDir(); run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); @@ -1053,11 +989,11 @@ public void testDropsWithCM() throws IOException { // Drop partition after dump run("ALTER TABLE " + dbName + ".ptned_copy DROP PARTITION(b='1')", driver); - run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'", driverMirror); + run("REPL LOAD " + replDbName + " FROM '" + postDropReplDumpLocn + "'", driverMirror); Exception e = null; try { - Table tbl = metaStoreClientMirror.getTable(dbName + "_dupe", "unptned"); + Table tbl = metaStoreClientMirror.getTable(replDbName, "unptned"); assertNull(tbl); } catch (TException te) { e = te; @@ -1065,24 +1001,20 @@ public void testDropsWithCM() throws IOException { assertNotNull(e); assertEquals(NoSuchObjectException.class, e.getClass()); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", driverMirror); - verifyResults(empty, driverMirror); - run("SELECT a from " + dbName + "_dupe.ptned", driverMirror); - verifyResults(ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned", ptn_data_1, driverMirror); - verifyIfTableNotExist(dbName +"_dupe", "ptned2", metaStoreClientMirror); + verifyIfTableNotExist(replDbName, "ptned2", metaStoreClientMirror); - run("SELECT a from " + dbName + "_dupe.unptned_copy", driverMirror); - verifyResults(unptn_data, driverMirror); - run("SELECT a from " + dbName + "_dupe.ptned_copy", driverMirror); - verifyResults(ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned_copy", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_copy", ptn_data_1, driverMirror); } @Test public void testTableAlters() throws IOException { - String testName = "TableAlters"; String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".unptned2(a string) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE", driver); @@ -1116,22 +1048,14 @@ public void testTableAlters() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='2'", ptn_data_2, driver); // base tables set up, let's replicate them over + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0,0,driver); - String replDumpId = getResult(0,1,true,driver); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); - - run("REPL STATUS " + dbName + "_dupe", driverMirror); - verifyResults(new String[] {replDumpId}, driverMirror); - - verifySetup("SELECT * from " + dbName + "_dupe.unptned", unptn_data, driverMirror); - verifySetup("SELECT * from " + dbName + "_dupe.unptned2", unptn_data, driverMirror); - verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", ptn_data_1, driverMirror); - verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", ptn_data_2, driverMirror); - verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'", ptn_data_1, driverMirror); - verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'", ptn_data_2, driverMirror); + verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror); + verifyRun("SELECT * from " + replDbName + ".unptned2", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b='1'", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b='2'", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned2 WHERE b='1'", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned2 WHERE b='2'", ptn_data_2, driverMirror); // tables have been replicated over, and verified to be identical. Now, we do a couple of // alters on the source @@ -1189,39 +1113,33 @@ public void testTableAlters() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned2_rn WHERE b=2", ptn_data_2, driver); // All alters done, now we replicate them over. - - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String postAlterReplDumpLocn = getResult(0,0,driver); - String postAlterReplDumpId = getResult(0,1,true,driver); - LOG.info("Dumped to {} with id {}->{}", postAlterReplDumpLocn, replDumpId, postAlterReplDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + postAlterReplDumpLocn + "'", driverMirror); + incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); // Replication done, we now do the following verifications: // verify that unpartitioned table rename succeeded. - verifyIfTableNotExist(dbName + "_dupe", "unptned", metaStoreClientMirror); - verifyRun("SELECT * from " + dbName + "_dupe.unptned_rn", unptn_data, driverMirror); + verifyIfTableNotExist(replDbName, "unptned", metaStoreClientMirror); + verifyRun("SELECT * from " + replDbName + ".unptned_rn", unptn_data, driverMirror); // verify that partition rename succeded. try { - Table unptn2 = metaStoreClientMirror.getTable(dbName + "_dupe" , "unptned2"); + Table unptn2 = metaStoreClientMirror.getTable(replDbName, "unptned2"); assertTrue(unptn2.getParameters().containsKey(testKey)); assertEquals(testVal,unptn2.getParameters().get(testKey)); } catch (TException te) { assertNull(te); } - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", empty, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=22", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=22", ptn_data_2, driverMirror); // verify that ptned table rename succeded. - verifyIfTableNotExist(dbName + "_dupe", "ptned2", metaStoreClientMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned2_rn WHERE b=2", ptn_data_2, driverMirror); + verifyIfTableNotExist(replDbName, "ptned2", metaStoreClientMirror); + verifyRun("SELECT a from " + replDbName + ".ptned2_rn WHERE b=2", ptn_data_2, driverMirror); // verify that ptned table property set worked try { - Table ptned = metaStoreClientMirror.getTable(dbName + "_dupe" , "ptned"); + Table ptned = metaStoreClientMirror.getTable(replDbName, "ptned"); assertTrue(ptned.getParameters().containsKey(testKey)); assertEquals(testVal, ptned.getParameters().get(testKey)); } catch (TException te) { @@ -1232,18 +1150,16 @@ public void testTableAlters() throws IOException { try { List ptnVals1 = new ArrayList(); ptnVals1.add("1"); - Partition ptn1 = metaStoreClientMirror.getPartition(dbName + "_dupe", "ptned", ptnVals1); + Partition ptn1 = metaStoreClientMirror.getPartition(replDbName, "ptned", ptnVals1); assertTrue(ptn1.getParameters().containsKey(testKey)); assertEquals(testVal,ptn1.getParameters().get(testKey)); } catch (TException te) { assertNull(te); } - } @Test public void testDatabaseAlters() throws IOException { - String testName = "DatabaseAlters"; String dbName = createDB(testName, driver); String replDbName = dbName + "_dupe"; @@ -1302,6 +1218,7 @@ public void testDatabaseAlters() throws IOException { public void testIncrementalLoad() throws IOException { String testName = "incrementalLoad"; String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); @@ -1309,12 +1226,8 @@ public void testIncrementalLoad() throws IOException { run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0,driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; String[] unptn_data = new String[] { "eleven", "twelve" }; String[] ptn_data_1 = new String[] { "thirteen", "fourteen", "fifteen" }; @@ -1338,14 +1251,10 @@ public void testIncrementalLoad() throws IOException { run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver); verifySetup("SELECT * from " + dbName + ".unptned_late", unptn_data, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data, driverMirror); + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; + + verifyRun("SELECT * from " + replDbName + ".unptned_late", unptn_data, driverMirror); run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver); run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName @@ -1365,32 +1274,22 @@ public void testIncrementalLoad() throws IOException { + ".ptned WHERE b=2", driver); verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - - verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2, driverMirror); + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptn_data_2, driverMirror); } @Test public void testIncrementalInserts() throws IOException { String testName = "incrementalInserts"; String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; String[] unptn_data = new String[] { "eleven", "twelve" }; @@ -1402,17 +1301,11 @@ public void testIncrementalInserts() throws IOException { run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver); verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); - verifyRun("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data, driver); - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data, driverMirror); + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; + + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data, driverMirror); String[] unptn_data_after_ins = new String[] { "eleven", "thirteen", "twelve" }; String[] data_after_ovwrite = new String[] { "hundred" }; @@ -1421,17 +1314,9 @@ public void testIncrementalInserts() throws IOException { run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')", driver); verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - - verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data_after_ins, driverMirror); - - verifyRun("SELECT a from " + dbName + "_dupe.unptned", data_after_ovwrite, driverMirror); + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data_after_ins, driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned", data_after_ovwrite, driverMirror); } @Test @@ -1545,18 +1430,12 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event @Test public void testIncrementalInsertToPartition() throws IOException { String testName = "incrementalInsertToPartition"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName, driver); + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" }; String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" }; @@ -1572,17 +1451,11 @@ public void testIncrementalInsertToPartition() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driver); verifySetup("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driver); - verifyRun("SELECT a from " + dbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driver); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; + + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); String[] data_after_ovwrite = new String[] { "hundred" }; // Insert overwrite on existing partition @@ -1592,25 +1465,18 @@ public void testIncrementalInsertToPartition() throws IOException { run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=3) values('" + data_after_ovwrite[0] + "')", driver); verifySetup("SELECT a from " + dbName + ".ptned where (b=3)", data_after_ovwrite, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); + incrementalLoadAndVerify(dbName, replDumpId, replDbName); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2)", data_after_ovwrite, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=3)", data_after_ovwrite, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=2)", data_after_ovwrite, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=3)", data_after_ovwrite, driverMirror); } @Test public void testInsertToMultiKeyPartition() throws IOException { String testName = "insertToMultiKeyPartition"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; - run("CREATE DATABASE " + dbName, driver); run("CREATE TABLE " + dbName + ".namelist(name string) partitioned by (year int, month int, day int) STORED AS TEXTFILE", driver); run("USE " + dbName, driver); @@ -1633,21 +1499,17 @@ public void testInsertToMultiKeyPartition() throws IOException { verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1980,month=4,day=1)", "location", "namelist/year=1980/month=4/day=1", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; - verifyRun("SELECT name from " + dbName + "_dupe.namelist where (year=1980) ORDER BY name", ptn_year_1980, driverMirror); - verifyRun("SELECT name from " + dbName + "_dupe.namelist where (day=1) ORDER BY name", ptn_day_1, driverMirror); - verifyRun("SELECT name from " + dbName + "_dupe.namelist where (year=1984 and month=4 and day=1) ORDER BY name", + verifyRun("SELECT name from " + replDbName + ".namelist where (year=1980) ORDER BY name", ptn_year_1980, driverMirror); + verifyRun("SELECT name from " + replDbName + ".namelist where (day=1) ORDER BY name", ptn_day_1, driverMirror); + verifyRun("SELECT name from " + replDbName + ".namelist where (year=1984 and month=4 and day=1) ORDER BY name", ptn_year_1984_month_4_day_1_1, driverMirror); - verifyRun("SELECT name from " + dbName + "_dupe.namelist ORDER BY name", ptn_data_1, driverMirror); - verifyRun("SHOW PARTITIONS " + dbName + "_dupe.namelist", ptn_list_1, driverMirror); + verifyRun("SELECT name from " + replDbName + ".namelist ORDER BY name", ptn_data_1, driverMirror); + verifyRun("SHOW PARTITIONS " + replDbName + ".namelist", ptn_list_1, driverMirror); - run("USE " + dbName + "_dupe", driverMirror); + run("USE " + replDbName, driverMirror); verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1980,month=4,day=1)", "location", "namelist/year=1980/month=4/day=1", driverMirror); run("USE " + dbName, driver); @@ -1670,20 +1532,16 @@ public void testInsertToMultiKeyPartition() throws IOException { verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1990,month=5,day=25)", "location", "namelist/year=1990/month=5/day=25", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT name from " + dbName + "_dupe.namelist where (year=1980) ORDER BY name", ptn_year_1980, driverMirror); - verifyRun("SELECT name from " + dbName + "_dupe.namelist where (day=1) ORDER BY name", ptn_day_1_2, driverMirror); - verifyRun("SELECT name from " + dbName + "_dupe.namelist where (year=1984 and month=4 and day=1) ORDER BY name", + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; + + verifyRun("SELECT name from " + replDbName + ".namelist where (year=1980) ORDER BY name", ptn_year_1980, driverMirror); + verifyRun("SELECT name from " + replDbName + ".namelist where (day=1) ORDER BY name", ptn_day_1_2, driverMirror); + verifyRun("SELECT name from " + replDbName + ".namelist where (year=1984 and month=4 and day=1) ORDER BY name", ptn_year_1984_month_4_day_1_2, driverMirror); - verifyRun("SELECT name from " + dbName + "_dupe.namelist ORDER BY name", ptn_data_2, driverMirror); - verifyRun("SHOW PARTITIONS " + dbName + "_dupe.namelist", ptn_list_2, driverMirror); - run("USE " + dbName + "_dupe", driverMirror); + verifyRun("SELECT name from " + replDbName + ".namelist ORDER BY name", ptn_data_2, driverMirror); + verifyRun("SHOW PARTITIONS " + replDbName + ".namelist", ptn_list_2, driverMirror); + run("USE " + replDbName, driverMirror); verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1990,month=5,day=25)", "location", "namelist/year=1990/month=5/day=25", driverMirror); run("USE " + dbName, driverMirror); @@ -1695,30 +1553,20 @@ public void testInsertToMultiKeyPartition() throws IOException { verifySetup("SELECT name from " + dbName + ".namelist where (year=1990 and month=5 and day=25)", data_after_ovwrite, driver); verifySetup("SELECT name from " + dbName + ".namelist ORDER BY name", ptn_data_3, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - - verifySetup("SELECT name from " + dbName + "_dupe.namelist where (year=1990 and month=5 and day=25)", data_after_ovwrite, driverMirror); - verifySetup("SELECT name from " + dbName + "_dupe.namelist ORDER BY name", ptn_data_3, driverMirror); + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifySetup("SELECT name from " + replDbName + ".namelist where (year=1990 and month=5 and day=25)", data_after_ovwrite, driverMirror); + verifySetup("SELECT name from " + replDbName + ".namelist ORDER BY name", ptn_data_3, driverMirror); } @Test public void testIncrementalInsertDropUnpartitionedTable() throws IOException { String testName = "incrementalInsertDropUnpartitionedTable"; String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; String[] unptn_data = new String[] { "eleven", "twelve" }; @@ -1730,9 +1578,8 @@ public void testIncrementalInsertDropUnpartitionedTable() throws IOException { verifySetup("SELECT a from " + dbName + ".unptned_tmp ORDER BY a", unptn_data, driver); // Get the last repl ID corresponding to all insert/alter/create events except DROP. - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String lastDumpIdWithoutDrop = getResult(0, 1, driver); + Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); + String lastDumpIdWithoutDrop = incrementalDump.lastReplId; // Drop all the tables run("DROP TABLE " + dbName + ".unptned", driver); @@ -1741,43 +1588,28 @@ public void testIncrementalInsertDropUnpartitionedTable() throws IOException { verifyFail("SELECT * FROM " + dbName + ".unptned_tmp", driver); // Dump all the events except DROP - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + lastDumpIdWithoutDrop, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; + incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, lastDumpIdWithoutDrop, replDbName); + replDumpId = incrementalDump.lastReplId; // Need to find the tables and data as drop is not part of this dump - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.unptned_tmp ORDER BY a", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned_tmp ORDER BY a", unptn_data, driverMirror); // Dump the drop events and check if tables are getting dropped in target as well - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyFail("SELECT * FROM " + dbName + ".unptned", driverMirror); - verifyFail("SELECT * FROM " + dbName + ".unptned_tmp", driverMirror); + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifyFail("SELECT * FROM " + replDbName + ".unptned", driverMirror); + verifyFail("SELECT * FROM " + replDbName + ".unptned_tmp", driverMirror); } @Test public void testIncrementalInsertDropPartitionedTable() throws IOException { String testName = "incrementalInsertDropPartitionedTable"; String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".ptned(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" }; String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" }; @@ -1799,9 +1631,8 @@ public void testIncrementalInsertDropPartitionedTable() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_tmp where (b=2) ORDER BY a", ptn_data_2, driver); // Get the last repl ID corresponding to all insert/alter/create events except DROP. - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String lastDumpIdWithoutDrop = getResult(0, 1, driver); + Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); + String lastDumpIdWithoutDrop = incrementalDump.lastReplId; // Drop all the tables run("DROP TABLE " + dbName + ".ptned_tmp", driver); @@ -1809,99 +1640,65 @@ public void testIncrementalInsertDropPartitionedTable() throws IOException { verifyFail("SELECT * FROM " + dbName + ".ptned_tmp", driver); verifyFail("SELECT * FROM " + dbName + ".ptned", driver); - // Dump all the events except DROP - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + lastDumpIdWithoutDrop, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; + // Replicate all the events except DROP + incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, lastDumpIdWithoutDrop, replDbName); + replDumpId = incrementalDump.lastReplId; // Need to find the tables and data as drop is not part of this dump - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_tmp where (b=1) ORDER BY a", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_tmp where (b=2) ORDER BY a", ptn_data_2, driverMirror); - - // Dump the drop events and check if tables are getting dropped in target as well - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_tmp where (b=1) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_tmp where (b=2) ORDER BY a", ptn_data_2, driverMirror); - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyFail("SELECT * FROM " + dbName + ".ptned_tmp", driverMirror); - verifyFail("SELECT * FROM " + dbName + ".ptned", driverMirror); + // Replicate the drop events and check if tables are getting dropped in target as well + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifyFail("SELECT * FROM " + replDbName + ".ptned_tmp", driverMirror); + verifyFail("SELECT * FROM " + replDbName + ".ptned", driverMirror); } @Test public void testInsertOverwriteOnUnpartitionedTableWithCM() throws IOException { String testName = "insertOverwriteOnUnpartitionedTableWithCM"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName, driver); + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; // After INSERT INTO operation, get the last Repl ID String[] unptn_data = new String[] { "thirteen" }; run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String insertDumpId = getResult(0, 1, false, driver); + Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); + String insertDumpId = incrementalDump.lastReplId; // Insert overwrite on unpartitioned table String[] data_after_ovwrite = new String[] { "hundred" }; run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')", driver); - // Dump only one INSERT INTO operation on the table. - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + insertDumpId, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; + // Replicate only one INSERT INTO operation on the table. + incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, insertDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; // After Load from this dump, all target tables/partitions will have initial set of data but source will have latest data. - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); - // Dump the remaining INSERT OVERWRITE operations on the table. - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + // Replicate the remaining INSERT OVERWRITE operations on the table. + incrementalLoadAndVerify(dbName, replDumpId, replDbName); // After load, shall see the overwritten data. - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", data_after_ovwrite, driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", data_after_ovwrite, driverMirror); } @Test public void testInsertOverwriteOnPartitionedTableWithCM() throws IOException { String testName = "insertOverwriteOnPartitionedTableWithCM"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName, driver); + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; // INSERT INTO 2 partitions and get the last repl ID String[] ptn_data_1 = new String[] { "fourteen" }; @@ -1909,38 +1706,29 @@ public void testInsertOverwriteOnPartitionedTableWithCM() throws IOException { run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')", driver); run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')", driver); run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')", driver); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String insertDumpId = getResult(0, 1, false, driver); + + Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); + String insertDumpId = incrementalDump.lastReplId; // Insert overwrite on one partition with multiple files String[] data_after_ovwrite = new String[] { "hundred" }; run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=2) values('" + data_after_ovwrite[0] + "')", driver); verifySetup("SELECT a from " + dbName + ".ptned where (b=2)", data_after_ovwrite, driver); - // Dump only 2 INSERT INTO operations. - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + insertDumpId, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; + // Replicate only 2 INSERT INTO operations. + incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, insertDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; // After Load from this dump, all target tables/partitions will have initial set of data. - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); - // Dump the remaining INSERT OVERWRITE operation on the table. - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + // Replicate the remaining INSERT OVERWRITE operation on the table. + incrementalLoadAndVerify(dbName, replDumpId, replDbName); // After load, shall see the overwritten data. - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", data_after_ovwrite, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", data_after_ovwrite, driverMirror); } @Test @@ -2003,19 +1791,14 @@ public void testWithStringPartitionSpecialChars() throws IOException { @Test public void testRenameTableWithCM() throws IOException { String testName = "renameTableWithCM"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; - run("CREATE DATABASE " + dbName, driver); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; String[] unptn_data = new String[] { "ten", "twenty" }; String[] ptn_data_1 = new String[] { "fifteen", "fourteen" }; @@ -2032,55 +1815,36 @@ public void testRenameTableWithCM() throws IOException { run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')", driver); // Get the last repl ID corresponding to all insert events except RENAME. - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String lastDumpIdWithoutRename = getResult(0, 1, driver); + Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); + String lastDumpIdWithoutRename = incrementalDump.lastReplId; run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_renamed", driver); run("ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_renamed", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + lastDumpIdWithoutRename, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); + incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, lastDumpIdWithoutRename, replDbName); + replDumpId = incrementalDump.lastReplId; - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyFail("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", driverMirror); - verifyFail("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.unptned_renamed ORDER BY a", unptn_data, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_renamed where (b=1) ORDER BY a", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_renamed where (b=2) ORDER BY a", ptn_data_2, driverMirror); + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifyFail("SELECT a from " + replDbName + ".unptned ORDER BY a", driverMirror); + verifyFail("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned_renamed ORDER BY a", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_renamed where (b=1) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_renamed where (b=2) ORDER BY a", ptn_data_2, driverMirror); } @Test public void testRenamePartitionWithCM() throws IOException { String testName = "renamePartitionWithCM"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName, driver); + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; String[] empty = new String[] {}; String[] ptn_data_1 = new String[] { "fifteen", "fourteen" }; @@ -2094,35 +1858,21 @@ public void testRenamePartitionWithCM() throws IOException { run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')", driver); // Get the last repl ID corresponding to all insert events except RENAME. - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String lastDumpIdWithoutRename = getResult(0, 1, driver); + Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); + String lastDumpIdWithoutRename = incrementalDump.lastReplId; run("ALTER TABLE " + dbName + ".ptned PARTITION (b=2) RENAME TO PARTITION (b=10)", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + lastDumpIdWithoutRename, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=10) ORDER BY a", empty, driverMirror); - - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; + incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, lastDumpIdWithoutRename, replDbName); + replDumpId = incrementalDump.lastReplId; + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=10) ORDER BY a", empty, driverMirror); - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=10) ORDER BY a", ptn_data_2, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", empty, driverMirror); + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=10) ORDER BY a", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", empty, driverMirror); } @Test @@ -2199,6 +1949,7 @@ public void testRenamePartitionedTableAcrossDatabases() throws IOException { public void testViewsReplication() throws IOException { String testName = "viewsReplication"; String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); @@ -2235,16 +1986,12 @@ public void testViewsReplication() throws IOException { //run("CREATE MATERIALIZED VIEW " + dbName + ".mat_view AS SELECT a FROM " + dbName + ".ptned where b=1", driver); //verifySetup("SELECT a from " + dbName + ".mat_view", ptn_data_1, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0,0,driver); - String replDumpId = getResult(0,1,true,driver); - LOG.info("Bootstrap-dump: Dumped to {} with id {}",replDumpLocn,replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; // view is referring to old database, so no data - verifyRun("SELECT * from " + dbName + "_dupe.virtual_view", empty, driverMirror); - //verifyRun("SELECT a from " + dbName + "_dupe.mat_view", ptn_data_1, driverMirror); + verifyRun("SELECT * from " + replDbName + ".virtual_view", empty, driverMirror); + //verifyRun("SELECT a from " + replDbName + ".mat_view", ptn_data_1, driverMirror); run("CREATE VIEW " + dbName + ".virtual_view2 AS SELECT a FROM " + dbName + ".ptned where b=2", driver); verifySetup("SELECT a from " + dbName + ".virtual_view2", ptn_data_2, driver); @@ -2256,76 +2003,55 @@ public void testViewsReplication() throws IOException { //verifySetup("SELECT * from " + dbName + ".mat_view2", unptn_data, driver); // Perform REPL-DUMP/LOAD - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String incrementalDumpLocn = getResult(0,0,driver); - String incrementalDumpId = getResult(0,1,true,driver); - LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'", driverMirror); - - run("REPL STATUS " + dbName + "_dupe", driverMirror); - verifyResults(new String[] {incrementalDumpId}, driverMirror); + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; - verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned where b=1", ptn_data_1, driverMirror); + verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where b=1", ptn_data_1, driverMirror); // view is referring to old database, so no data - verifyRun("SELECT * from " + dbName + "_dupe.virtual_view", empty, driverMirror); - //verifyRun("SELECT a from " + dbName + "_dupe.mat_view", ptn_data_1, driverMirror); + verifyRun("SELECT * from " + replDbName + ".virtual_view", empty, driverMirror); + //verifyRun("SELECT a from " + replDbName + ".mat_view", ptn_data_1, driverMirror); // view is referring to old database, so no data - verifyRun("SELECT * from " + dbName + "_dupe.virtual_view2", empty, driverMirror); - //verifyRun("SELECT * from " + dbName + "_dupe.mat_view2", unptn_data, driverMirror); + verifyRun("SELECT * from " + replDbName + ".virtual_view2", empty, driverMirror); + //verifyRun("SELECT * from " + replDbName + ".mat_view2", unptn_data, driverMirror); // Test "alter table" with rename run("ALTER VIEW " + dbName + ".virtual_view RENAME TO " + dbName + ".virtual_view_rename", driver); verifySetup("SELECT * from " + dbName + ".virtual_view_rename", unptn_data, driver); // Perform REPL-DUMP/LOAD - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + incrementalDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT * from " + dbName + "_dupe.virtual_view_rename", empty, driverMirror); + incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; + verifyRun("SELECT * from " + replDbName + ".virtual_view_rename", empty, driverMirror); // Test "alter table" with schema change run("ALTER VIEW " + dbName + ".virtual_view_rename AS SELECT a, concat(a, '_') as a_ FROM " + dbName + ".unptned", driver); verifySetup("SHOW COLUMNS FROM " + dbName + ".virtual_view_rename", new String[] {"a", "a_"}, driver); // Perform REPL-DUMP/LOAD - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + incrementalDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SHOW COLUMNS FROM " + dbName + "_dupe.virtual_view_rename", new String[] {"a", "a_"}, driverMirror); + incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; + verifyRun("SHOW COLUMNS FROM " + replDbName + ".virtual_view_rename", new String[] {"a", "a_"}, driverMirror); // Test "DROP VIEW" run("DROP VIEW " + dbName + ".virtual_view", driver); verifyIfTableNotExist(dbName, "virtual_view", metaStoreClient); // Perform REPL-DUMP/LOAD - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + incrementalDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyIfTableNotExist(dbName + "_dupe", "virtual_view", metaStoreClientMirror); + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifyIfTableNotExist(replDbName, "virtual_view", metaStoreClientMirror); } @Test public void testDumpLimit() throws IOException { String name = testName.getMethodName(); String dbName = createDB(name, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + Tuple bootstrapDump = replDumpDb(dbName, null, null, null); + String replDumpId = bootstrapDump.lastReplId; + String replDumpLocn = bootstrapDump.dumpLocation; String[] unptn_data = new String[] { "eleven", "thirteen", "twelve" }; String[] unptn_data_load1 = new String[] { "eleven" }; @@ -2345,51 +2071,31 @@ public void testDumpLimit() throws IOException { run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[2] + "')", driver); verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT " + numOfEventsIns1, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, null, numOfEventsIns1.toString(), replDbName); + replDumpId = incrementalDump.lastReplId; + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load1, driverMirror); - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1, driverMirror); - - advanceDumpDir(); Integer lastReplID = Integer.valueOf(replDumpId); lastReplID += 1000; String toReplID = String.valueOf(lastReplID); - run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + toReplID + " LIMIT " + numOfEventsIns2, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load2, driverMirror); + incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, toReplID, numOfEventsIns2.toString(), replDbName); + replDumpId = incrementalDump.lastReplId; - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load2, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data, driverMirror); + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); } @Test public void testExchangePartition() throws IOException { String testName = "exchangePartition"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; - run("CREATE DATABASE " + dbName, driver); run("CREATE TABLE " + dbName + ".ptned_src(a string) partitioned by (b int, c int) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".ptned_dest(a string) partitioned by (b int, c int) STORED AS TEXTFILE", driver); @@ -2413,21 +2119,14 @@ public void testExchangePartition() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2, driver); verifySetup("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + ".ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1, driver); - verifyRun("SELECT a from " + dbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2, driver); - verifyRun("SELECT a from " + dbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2, driver); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=1 and c=1)", empty, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=2)", empty, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=3)", empty, driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; + verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=1 and c=1) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_dest where (b=1 and c=1)", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_dest where (b=2 and c=2)", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_dest where (b=2 and c=3)", empty, driverMirror); // Exchange single partitions using complete partition-spec (all partition columns) run("ALTER TABLE " + dbName + ".ptned_dest EXCHANGE PARTITION (b=1, c=1) WITH TABLE " + dbName + ".ptned_src", driver); @@ -2438,19 +2137,14 @@ public void testExchangePartition() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=2)", empty, driver); verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=3)", empty, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=1 and c=1)", empty, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=2)", empty, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=3)", empty, driverMirror); + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; + verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=1 and c=1)", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=2 and c=2) ORDER BY a", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=2 and c=3) ORDER BY a", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_dest where (b=2 and c=2)", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_dest where (b=2 and c=3)", empty, driverMirror); // Exchange multiple partitions using partial partition-spec (only one partition column) run("ALTER TABLE " + dbName + ".ptned_dest EXCHANGE PARTITION (b=2) WITH TABLE " + dbName + ".ptned_src", driver); @@ -2461,88 +2155,56 @@ public void testExchangePartition() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=2) ORDER BY a", ptn_data_2, driver); verifySetup("SELECT a from " + dbName + ".ptned_dest where (b=2 and c=3) ORDER BY a", ptn_data_2, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=1 and c=1)", empty, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=2)", empty, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_src where (b=2 and c=3)", empty, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=2) ORDER BY a", ptn_data_2, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_dest where (b=2 and c=3) ORDER BY a", ptn_data_2, driverMirror); + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=1 and c=1)", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=2 and c=2)", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_src where (b=2 and c=3)", empty, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_dest where (b=1 and c=1) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_dest where (b=2 and c=2) ORDER BY a", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_dest where (b=2 and c=3) ORDER BY a", ptn_data_2, driverMirror); } @Test public void testTruncateTable() throws IOException { String testName = "truncateTable"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName, driver); + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; String[] unptn_data = new String[] { "eleven", "twelve" }; String[] empty = new String[] {}; run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')", driver); - verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); + verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data, driverMirror); + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); run("TRUNCATE TABLE " + dbName + ".unptned", driver); verifySetup("SELECT a from " + dbName + ".unptned", empty, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + ".unptned", empty, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.unptned", empty, driverMirror); + incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; + verifyRun("SELECT a from " + replDbName + ".unptned", empty, driverMirror); String[] unptn_data_after_ins = new String[] { "thirteen" }; run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_after_ins[0] + "')", driver); verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins, driver); - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_after_ins, driverMirror); + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_after_ins, driverMirror); } @Test public void testTruncatePartitionedTable() throws IOException { String testName = "truncatePartitionedTable"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; - run("CREATE DATABASE " + dbName, driver); run("CREATE TABLE " + dbName + ".ptned_1(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName + ".ptned_2(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); @@ -2563,21 +2225,17 @@ public void testTruncatePartitionedTable() throws IOException { run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[1] + "')", driver); run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[2] + "')", driver); - verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1, driver); - verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=2) ORDER BY a", ptn_data_2, driver); - verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=10) ORDER BY a", ptn_data_1, driver); - verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=20) ORDER BY a", ptn_data_2, driver); + verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1, driver); + verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=2) ORDER BY a", ptn_data_2, driver); + verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=10) ORDER BY a", ptn_data_1, driver); + verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=20) ORDER BY a", ptn_data_2, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2) ORDER BY a", ptn_data_2, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10) ORDER BY a", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20) ORDER BY a", ptn_data_2, driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; + verifyRun("SELECT a from " + replDbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_1 where (b=2) ORDER BY a", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_2 where (b=10) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_2 where (b=20) ORDER BY a", ptn_data_2, driverMirror); run("TRUNCATE TABLE " + dbName + ".ptned_1 PARTITION(b=2)", driver); verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1, driver); @@ -2587,33 +2245,23 @@ public void testTruncatePartitionedTable() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=10)", empty, driver); verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=20)", empty, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1, driverMirror); - verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2)", empty, driverMirror); - verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10)", empty, driverMirror); - verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20)", empty, driverMirror); + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifySetup("SELECT a from " + replDbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1, driverMirror); + verifySetup("SELECT a from " + replDbName + ".ptned_1 where (b=2)", empty, driverMirror); + verifySetup("SELECT a from " + replDbName + ".ptned_2 where (b=10)", empty, driverMirror); + verifySetup("SELECT a from " + replDbName + ".ptned_2 where (b=20)", empty, driverMirror); } @Test public void testTruncateWithCM() throws IOException { String testName = "truncateWithCM"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName, driver); + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + Tuple bootstrapDump = replDumpDb(dbName, null, null, null); + String replDumpId = bootstrapDump.lastReplId; + String replDumpLocn = bootstrapDump.dumpLocation; String[] empty = new String[] {}; String[] unptn_data = new String[] { "eleven", "thirteen" }; @@ -2641,56 +2289,31 @@ public void testTruncateWithCM() throws IOException { run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_load1[0] + "')", driver); verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1, driver); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror); // Dump and load only first insert (1 record) - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT " + numOfEventsIns1, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, null, numOfEventsIns1.toString(), replDbName); + replDumpId = incrementalDump.lastReplId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1, driver); verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1, driverMirror); // Dump and load only second insert (2 records) - advanceDumpDir(); Integer lastReplID = Integer.valueOf(replDumpId); lastReplID += 1000; String toReplID = String.valueOf(lastReplID); - run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + toReplID + " LIMIT " + numOfEventsIns2, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load2, driverMirror); + incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, toReplID, numOfEventsIns2.toString(), replDbName); + replDumpId = incrementalDump.lastReplId; + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load2, driverMirror); // Dump and load only truncate (0 records) - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT " + numOfEventsTrunc3, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", empty, driverMirror); + incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, null, numOfEventsTrunc3.toString(), replDbName); + replDumpId = incrementalDump.lastReplId; + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty, driverMirror); // Dump and load insert after truncate (1 record) - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - replDumpId = incrementalDumpId; - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - - verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1, driverMirror); + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load1, driverMirror); } @Test @@ -2994,29 +2617,34 @@ public void testIncrementalLoadFailAndRetry() throws IOException { public void testStatus() throws IOException { String name = testName.getMethodName(); String dbName = createDB(name, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String lastReplDumpLocn = getResult(0, 0, driver); - String lastReplDumpId = getResult(0, 1, true, driver); - run("REPL LOAD " + dbName + "_dupe FROM '" + lastReplDumpLocn + "'", driverMirror); + String replDbName = dbName + "_dupe"; + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String lastReplDumpId = bootstrapDump.lastReplId; // Bootstrap done, now on to incremental. First, we test db-level REPL LOADs. // Both db-level and table-level repl.last.id must be updated. lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId, - "CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); + "CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", + replDbName); lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId, - "ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)"); + "ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", + replDbName); lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId, - "ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=11)"); + "ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=11)", + replDbName); lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId, - "ALTER TABLE " + dbName + ".ptned SET TBLPROPERTIES ('blah'='foo')"); + "ALTER TABLE " + dbName + ".ptned SET TBLPROPERTIES ('blah'='foo')", + replDbName); lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned_rn", lastReplDumpId, - "ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_rn"); + "ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_rn", + replDbName); lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned_rn", lastReplDumpId, - "ALTER TABLE " + dbName + ".ptned_rn DROP PARTITION (b=11)"); + "ALTER TABLE " + dbName + ".ptned_rn DROP PARTITION (b=11)", + replDbName); lastReplDumpId = verifyAndReturnDbReplStatus(dbName, null, lastReplDumpId, - "DROP TABLE " + dbName + ".ptned_rn"); + "DROP TABLE " + dbName + ".ptned_rn", + replDbName); // DB-level REPL LOADs testing done, now moving on to table level repl loads. // In each of these cases, the table-level repl.last.id must move forward, but the @@ -3025,20 +2653,25 @@ public void testStatus() throws IOException { String lastTblReplDumpId = lastReplDumpId; lastTblReplDumpId = verifyAndReturnTblReplStatus( dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, - "CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b int) STORED AS TEXTFILE"); + "CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b int) STORED AS TEXTFILE", + replDbName); lastTblReplDumpId = verifyAndReturnTblReplStatus( dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, - "ALTER TABLE " + dbName + ".ptned2 ADD PARTITION (b=1)"); + "ALTER TABLE " + dbName + ".ptned2 ADD PARTITION (b=1)", + replDbName); lastTblReplDumpId = verifyAndReturnTblReplStatus( dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, - "ALTER TABLE " + dbName + ".ptned2 PARTITION (b=1) RENAME TO PARTITION (b=11)"); + "ALTER TABLE " + dbName + ".ptned2 PARTITION (b=1) RENAME TO PARTITION (b=11)", + replDbName); lastTblReplDumpId = verifyAndReturnTblReplStatus( dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, - "ALTER TABLE " + dbName + ".ptned2 SET TBLPROPERTIES ('blah'='foo')"); + "ALTER TABLE " + dbName + ".ptned2 SET TBLPROPERTIES ('blah'='foo')", + replDbName); // Note : Not testing table rename because table rename replication is not supported for table-level repl. String finalTblReplDumpId = verifyAndReturnTblReplStatus( dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, - "ALTER TABLE " + dbName + ".ptned2 DROP PARTITION (b=11)"); + "ALTER TABLE " + dbName + ".ptned2 DROP PARTITION (b=11)", + replDbName); /* Comparisons using Strings for event Ids is wrong. This should be numbers since lexical string comparison @@ -3057,30 +2690,24 @@ public void testStatus() throws IOException { @Test public void testConstraints() throws IOException { String testName = "constraints"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName, driver); + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; run("CREATE TABLE " + dbName + ".tbl1(a string, b string, primary key (a, b) disable novalidate rely)", driver); run("CREATE TABLE " + dbName + ".tbl2(a string, b string, foreign key (a, b) references " + dbName + ".tbl1(a, b) disable novalidate)", driver); run("CREATE TABLE " + dbName + ".tbl3(a string, b string not null disable, unique (a) disable)", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0, driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; try { - List pks = metaStoreClientMirror.getPrimaryKeys(new PrimaryKeysRequest(dbName+ "_dupe" , "tbl1")); + List pks = metaStoreClientMirror.getPrimaryKeys(new PrimaryKeysRequest(replDbName , "tbl1")); assertEquals(pks.size(), 2); - List uks = metaStoreClientMirror.getUniqueConstraints(new UniqueConstraintsRequest(DEFAULT_CATALOG_NAME, dbName+ "_dupe" , "tbl3")); + List uks = metaStoreClientMirror.getUniqueConstraints(new UniqueConstraintsRequest(DEFAULT_CATALOG_NAME, replDbName , "tbl3")); assertEquals(uks.size(), 1); - List fks = metaStoreClientMirror.getForeignKeys(new ForeignKeysRequest(null, null, dbName+ "_dupe" , "tbl2")); + List fks = metaStoreClientMirror.getForeignKeys(new ForeignKeysRequest(null, null, replDbName , "tbl2")); assertEquals(fks.size(), 2); - List nns = metaStoreClientMirror.getNotNullConstraints(new NotNullConstraintsRequest(DEFAULT_CATALOG_NAME, dbName+ "_dupe" , "tbl3")); + List nns = metaStoreClientMirror.getNotNullConstraints(new NotNullConstraintsRequest(DEFAULT_CATALOG_NAME, replDbName , "tbl3")); assertEquals(nns.size(), 1); } catch (TException te) { assertNull(te); @@ -3090,28 +2717,24 @@ public void testConstraints() throws IOException { run("CREATE TABLE " + dbName + ".tbl5(a string, b string, foreign key (a, b) references " + dbName + ".tbl4(a, b) disable novalidate)", driver); run("CREATE TABLE " + dbName + ".tbl6(a string, b string not null disable, unique (a) disable)", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; String pkName = null; String ukName = null; String fkName = null; String nnName = null; try { - List pks = metaStoreClientMirror.getPrimaryKeys(new PrimaryKeysRequest(dbName+ "_dupe" , "tbl4")); + List pks = metaStoreClientMirror.getPrimaryKeys(new PrimaryKeysRequest(replDbName , "tbl4")); assertEquals(pks.size(), 2); pkName = pks.get(0).getPk_name(); - List uks = metaStoreClientMirror.getUniqueConstraints(new UniqueConstraintsRequest(DEFAULT_CATALOG_NAME, dbName+ "_dupe" , "tbl6")); + List uks = metaStoreClientMirror.getUniqueConstraints(new UniqueConstraintsRequest(DEFAULT_CATALOG_NAME, replDbName , "tbl6")); assertEquals(uks.size(), 1); ukName = uks.get(0).getUk_name(); - List fks = metaStoreClientMirror.getForeignKeys(new ForeignKeysRequest(null, null, dbName+ "_dupe" , "tbl5")); + List fks = metaStoreClientMirror.getForeignKeys(new ForeignKeysRequest(null, null, replDbName , "tbl5")); assertEquals(fks.size(), 2); fkName = fks.get(0).getFk_name(); - List nns = metaStoreClientMirror.getNotNullConstraints(new NotNullConstraintsRequest(DEFAULT_CATALOG_NAME, dbName+ "_dupe" , "tbl6")); + List nns = metaStoreClientMirror.getNotNullConstraints(new NotNullConstraintsRequest(DEFAULT_CATALOG_NAME, replDbName , "tbl6")); assertEquals(nns.size(), 1); nnName = nns.get(0).getNn_name(); @@ -3124,21 +2747,15 @@ public void testConstraints() throws IOException { run("ALTER TABLE " + dbName + ".tbl5 DROP CONSTRAINT `" + fkName + "`", driver); run("ALTER TABLE " + dbName + ".tbl6 DROP CONSTRAINT `" + nnName + "`", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + incrementalDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - + incrementalLoadAndVerify(dbName, replDumpId, replDbName); try { - List pks = metaStoreClientMirror.getPrimaryKeys(new PrimaryKeysRequest(dbName+ "_dupe" , "tbl4")); + List pks = metaStoreClientMirror.getPrimaryKeys(new PrimaryKeysRequest(replDbName , "tbl4")); assertTrue(pks.isEmpty()); - List uks = metaStoreClientMirror.getUniqueConstraints(new UniqueConstraintsRequest(DEFAULT_CATALOG_NAME, dbName+ "_dupe" , "tbl4")); + List uks = metaStoreClientMirror.getUniqueConstraints(new UniqueConstraintsRequest(DEFAULT_CATALOG_NAME, replDbName , "tbl4")); assertTrue(uks.isEmpty()); - List fks = metaStoreClientMirror.getForeignKeys(new ForeignKeysRequest(null, null, dbName+ "_dupe" , "tbl5")); + List fks = metaStoreClientMirror.getForeignKeys(new ForeignKeysRequest(null, null, replDbName , "tbl5")); assertTrue(fks.isEmpty()); - List nns = metaStoreClientMirror.getNotNullConstraints(new NotNullConstraintsRequest(DEFAULT_CATALOG_NAME, dbName+ "_dupe" , "tbl6")); + List nns = metaStoreClientMirror.getNotNullConstraints(new NotNullConstraintsRequest(DEFAULT_CATALOG_NAME, replDbName , "tbl6")); assertTrue(nns.isEmpty()); } catch (TException te) { assertNull(te); @@ -3149,6 +2766,7 @@ public void testConstraints() throws IOException { public void testRemoveStats() throws IOException { String name = testName.getMethodName(); String dbName = createDB(name, driver); + String replDbName = dbName + "_dupe"; String[] unptn_data = new String[]{ "1" , "2" }; String[] ptn_data_1 = new String[]{ "5", "7", "8"}; @@ -3178,17 +2796,13 @@ public void testRemoveStats() throws IOException { verifySetup("SELECT max(a) from " + dbName + ".unptned", new String[]{"2"}, driver); verifySetup("SELECT max(a) from " + dbName + ".ptned where b=1", new String[]{"8"}, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0,0,driver); - String replDumpId = getResult(0,1,true,driver); - LOG.info("Dumped to {} with id {}",replDumpLocn,replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; - verifyRun("SELECT count(*) from " + dbName + "_dupe.unptned", new String[]{"2"}, driverMirror); - verifyRun("SELECT count(*) from " + dbName + "_dupe.ptned", new String[]{"3"}, driverMirror); - verifyRun("SELECT max(a) from " + dbName + "_dupe.unptned", new String[]{"2"}, driverMirror); - verifyRun("SELECT max(a) from " + dbName + "_dupe.ptned where b=1", new String[]{"8"}, driverMirror); + verifyRun("SELECT count(*) from " + replDbName + ".unptned", new String[]{"2"}, driverMirror); + verifyRun("SELECT count(*) from " + replDbName + ".ptned", new String[]{"3"}, driverMirror); + verifyRun("SELECT max(a) from " + replDbName + ".unptned", new String[]{"2"}, driverMirror); + verifyRun("SELECT max(a) from " + replDbName + ".ptned where b=1", new String[]{"8"}, driverMirror); run("CREATE TABLE " + dbName + ".unptned2(a int) STORED AS TEXTFILE", driver); run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned2", driver); @@ -3199,23 +2813,19 @@ public void testRemoveStats() throws IOException { run("ANALYZE TABLE " + dbName + ".ptned2 partition(b) COMPUTE STATISTICS FOR COLUMNS", driver); run("ANALYZE TABLE " + dbName + ".ptned2 partition(b) COMPUTE STATISTICS", driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String incrementalDumpLocn = getResult(0,0,driver); - String incrementalDumpId = getResult(0,1,true,driver); - LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'", driverMirror); - - verifyRun("SELECT count(*) from " + dbName + "_dupe.unptned2", new String[]{"2"}, driverMirror); - verifyRun("SELECT count(*) from " + dbName + "_dupe.ptned2", new String[]{"3"}, driverMirror); - verifyRun("SELECT max(a) from " + dbName + "_dupe.unptned2", new String[]{"2"}, driverMirror); - verifyRun("SELECT max(a) from " + dbName + "_dupe.ptned2 where b=1", new String[]{"8"}, driverMirror); + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifyRun("SELECT count(*) from " + replDbName + ".unptned2", new String[]{"2"}, driverMirror); + verifyRun("SELECT count(*) from " + replDbName + ".ptned2", new String[]{"3"}, driverMirror); + verifyRun("SELECT max(a) from " + replDbName + ".unptned2", new String[]{"2"}, driverMirror); + verifyRun("SELECT max(a) from " + replDbName + ".ptned2 where b=1", new String[]{"8"}, driverMirror); } + // TODO: This test should be removed once ACID tables replication is supported. @Test public void testSkipTables() throws Exception { String testName = "skipTables"; String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; // TODO: this is wrong; this test sets up dummy txn manager and so it cannot create ACID tables. // If I change it to use proper txn manager, the setup for some tests hangs. @@ -3230,27 +2840,22 @@ public void testSkipTables() throws Exception { verifyIfTableExist(dbName, "mm_table", metaStoreClient); // Bootstrap test - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0,driver); - String replDumpId = getResult(0, 1, true, driver); - LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); - verifyIfTableNotExist(dbName + "_dupe", "acid_table", metaStoreClientMirror); - verifyIfTableNotExist(dbName + "_dupe", "mm_table", metaStoreClientMirror); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; + verifyIfTableNotExist(replDbName, "acid_table", metaStoreClientMirror); + verifyIfTableNotExist(replDbName, "mm_table", metaStoreClientMirror); // Test alter table run("ALTER TABLE " + dbName + ".acid_table RENAME TO " + dbName + ".acid_table_rename", driver); verifyIfTableExist(dbName, "acid_table_rename", metaStoreClient); + // Dummy create table command to mark proper last repl ID after dump + run("CREATE TABLE " + dbName + ".dummy (a int)", driver); + // Perform REPL-DUMP/LOAD - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'", driverMirror); - verifyIfTableNotExist(dbName + "_dupe", "acid_table_rename", metaStoreClientMirror); + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; + verifyIfTableNotExist(replDbName, "acid_table_rename", metaStoreClientMirror); // Create another table for incremental repl verification run("CREATE TABLE " + dbName + ".acid_table_incremental (key int, value int) PARTITIONED BY (load_date date) " + @@ -3261,17 +2866,14 @@ public void testSkipTables() throws Exception { verifyIfTableExist(dbName, "acid_table_incremental", metaStoreClient); verifyIfTableExist(dbName, "mm_table_incremental", metaStoreClient); + // Dummy insert into command to mark proper last repl ID after dump + run("INSERT INTO " + dbName + ".dummy values(1)", driver); + // Perform REPL-DUMP/LOAD - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + incrementalDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); - run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'", driverMirror); - verifyIfTableNotExist(dbName + "_dupe", "acid_table_incremental", metaStoreClientMirror); - verifyIfTableNotExist(dbName + "_dupe", "mm_table_incremental", metaStoreClientMirror); + incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); + replDumpId = incrementalDump.lastReplId; + verifyIfTableNotExist(replDbName, "acid_table_incremental", metaStoreClientMirror); + verifyIfTableNotExist(replDbName, "mm_table_incremental", metaStoreClientMirror); // Test adding a constraint run("ALTER TABLE " + dbName + ".acid_table_incremental ADD CONSTRAINT key_pk PRIMARY KEY (key) DISABLE NOVALIDATE", driver); @@ -3282,23 +2884,20 @@ public void testSkipTables() throws Exception { assertNull(te); } + // Dummy insert into command to mark proper last repl ID after dump + run("INSERT INTO " + dbName + ".dummy values(2)", driver); + // Perform REPL-DUMP/LOAD - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + incrementalDumpId, driver); - incrementalDumpLocn = getResult(0, 0, driver); - incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - printOutput(driverMirror); - run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'", driverMirror); - verifyIfTableNotExist(dbName + "_dupe", "acid_table_incremental", metaStoreClientMirror); + incrementalLoadAndVerify(dbName, replDumpId, replDbName); + verifyIfTableNotExist(replDbName, "acid_table_incremental", metaStoreClientMirror); } @Test public void testDeleteStagingDir() throws IOException { - String testName = "deleteStagingDir"; - String dbName = createDB(testName, driver); - String tableName = "unptned"; + String testName = "deleteStagingDir"; + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; + String tableName = "unptned"; run("CREATE TABLE " + StatsUtils.getFullyQualifiedTableName(dbName, tableName) + "(a string) STORED AS TEXTFILE", driver); @@ -3309,19 +2908,17 @@ public void testDeleteStagingDir() throws IOException { verifySetup("SELECT * from " + dbName + ".unptned", unptn_data, driver); // Perform repl - advanceDumpDir(); - run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0,0,driver); + String replDumpLocn = replDumpDb(dbName, null, null, null).dumpLocation; // Reset the driver driverMirror.close(); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror); // Calling close() explicitly to clean up the staging dirs driverMirror.close(); // Check result Path warehouse = new Path(System.getProperty("test.warehouse.dir", "/tmp")); FileSystem fs = FileSystem.get(warehouse.toUri(), hconf); try { - Path path = new Path(warehouse, dbName + "_dupe.db" + Path.SEPARATOR + tableName); + Path path = new Path(warehouse, replDbName + ".db" + Path.SEPARATOR + tableName); // First check if the table dir exists (could have been deleted for some reason in pre-commit tests) if (!fs.exists(path)) { @@ -3347,6 +2944,7 @@ public boolean accept(Path path) public void testCMConflict() throws IOException { String testName = "cmConflict"; String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; // Create table and insert two file of the same content run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); @@ -3354,25 +2952,19 @@ public void testCMConflict() throws IOException { run("INSERT INTO TABLE " + dbName + ".unptned values('ten')", driver); // Bootstrap test + Tuple bootstrapDump = replDumpDb(dbName, null, null, null); advanceDumpDir(); run("REPL DUMP " + dbName, driver); - String replDumpLocn = getResult(0, 0,driver); - String replDumpId = getResult(0, 1, true, driver); + String replDumpLocn = bootstrapDump.dumpLocation; + String replDumpId = bootstrapDump.lastReplId; // Drop two files so they are moved to CM run("TRUNCATE TABLE " + dbName + ".unptned", driver); LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror); - verifyRun("SELECT count(*) from " + dbName + "_dupe.unptned", new String[]{"2"}, driverMirror); - } - - private static String createDB(String name, IDriver myDriver) { - LOG.info("Testing " + name); - String dbName = name + "_" + tid; - run("CREATE DATABASE " + dbName, myDriver); - return dbName; + verifyRun("SELECT count(*) from " + replDbName + ".unptned", new String[]{"2"}, driverMirror); } @Test @@ -3555,6 +3147,13 @@ public void testRecycleFileDropTempTable() throws IOException { assertTrue(fileCount == fileCountAfter); } + private static String createDB(String name, IDriver myDriver) { + LOG.info("Testing " + name); + String dbName = name + "_" + tid; + run("CREATE DATABASE " + dbName, myDriver); + return dbName; + } + private NotificationEvent createDummyEvent(String dbname, String tblname, long evid) { MessageFactory msgFactory = MessageFactory.getInstance(); Table t = new Table(); @@ -3572,16 +3171,13 @@ private NotificationEvent createDummyEvent(String dbname, String tblname, long e return event; } - private String verifyAndReturnDbReplStatus(String dbName, String tblName, String prevReplDumpId, String cmd) throws IOException { + private String verifyAndReturnDbReplStatus(String dbName, String tblName, + String prevReplDumpId, String cmd, + String replDbName) throws IOException { run(cmd, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + prevReplDumpId, driver); - String lastDumpLocn = getResult(0, 0, driver); - String lastReplDumpId = getResult(0, 1, true, driver); - run("REPL LOAD " + dbName + "_dupe FROM '" + lastDumpLocn + "'", driverMirror); - verifyRun("REPL STATUS " + dbName + "_dupe", lastReplDumpId, driverMirror); + String lastReplDumpId = incrementalLoadAndVerify(dbName, prevReplDumpId, replDbName).lastReplId; if (tblName != null){ - verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId, driverMirror); + verifyRun("REPL STATUS " + replDbName + "." + tblName, lastReplDumpId, driverMirror); } assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(prevReplDumpId)); return lastReplDumpId; @@ -3589,20 +3185,16 @@ private String verifyAndReturnDbReplStatus(String dbName, String tblName, String // Tests that doing a table-level REPL LOAD updates table repl.last.id, but not db-level repl.last.id private String verifyAndReturnTblReplStatus( - String dbName, String tblName, String lastDbReplDumpId, String prevReplDumpId, String cmd) throws IOException { + String dbName, String tblName, String lastDbReplDumpId, String prevReplDumpId, String cmd, + String replDbName) throws IOException { run(cmd, driver); - advanceDumpDir(); - run("REPL DUMP " + dbName + "."+ tblName + " FROM " + prevReplDumpId, driver); - String lastDumpLocn = getResult(0, 0, driver); - String lastReplDumpId = getResult(0, 1, true, driver); - run("REPL LOAD " + dbName + "_dupe." + tblName + " FROM '" + lastDumpLocn + "'", driverMirror); - verifyRun("REPL STATUS " + dbName + "_dupe", lastDbReplDumpId, driverMirror); - verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId, driverMirror); + String lastReplDumpId + = incrementalLoadAndVerify(dbName + "." + tblName, prevReplDumpId, replDbName + "." + tblName).lastReplId; + verifyRun("REPL STATUS " + replDbName, lastDbReplDumpId, driverMirror); assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(prevReplDumpId)); return lastReplDumpId; } - private String getResult(int rowNum, int colNum, IDriver myDriver) throws IOException { return getResult(rowNum,colNum,false, myDriver); }