diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 39722a0dd2..16da5f2d14 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.exec.repl.ReplAck; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.session.SessionState; @@ -47,6 +49,9 @@ import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -55,6 +60,7 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.parse.EximUtil.LAST_EVENT_ID_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -663,6 +669,454 @@ public void testMultiDBTxn() throws Throwable { } } + @Test + public void testIncrementalDumpCheckpointing() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("CREATE TABLE t2(a string) STORED AS TEXTFILE") + .dump(primaryDbName); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {}); + + + //Case 1: When the last dump finished all the events and + //only _finished_dump file at the hiveDumpRoot was about to be written when it failed. + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("insert into t1 values (1)") + .run("insert into t2 values (2)") + .dump(primaryDbName); + + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(ackLastEventID)); + + fs.delete(ackFile, false); + + long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1; + long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId); + assertTrue(lastIncEventID > (firstIncEventID + 1)); + Map pathModTimeMap = new HashMap<>(); + for (long eventId=firstIncEventID; eventId<=lastIncEventID; eventId++) { + Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId)); + if (fs.exists(eventRoot)) { + for (FileStatus fileStatus: fs.listStatus(eventRoot)) { + pathModTimeMap.put(fileStatus.getPath(), fileStatus.getModificationTime()); + } + } + } + + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName); + assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation); + assertTrue(fs.exists(ackFile)); + //check events were not rewritten. + for(Map.Entry entry :pathModTimeMap.entrySet()) { + assertEquals((long)entry.getValue(), fs.getFileStatus(new Path(hiveDumpDir, entry.getKey())).getModificationTime()); + } + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"2"}); + + + //Case 2: When the last dump was half way through + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump3 = primary.run("use " + primaryDbName) + .run("insert into t1 values (3)") + .run("insert into t2 values (4)") + .dump(primaryDbName); + + hiveDumpDir = new Path(incrementalDump3.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(ackLastEventID)); + + fs.delete(ackFile, false); + //delete last three events and test if it recovers. + long lastEventID = Long.parseLong(incrementalDump3.lastReplicationId); + Path lastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID)); + Path secondLastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID - 1)); + Path thirdLastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID - 2)); + assertTrue(fs.exists(lastEvtRoot)); + assertTrue(fs.exists(secondLastEvtRoot)); + assertTrue(fs.exists(thirdLastEvtRoot)); + + pathModTimeMap = new HashMap<>(); + for (long idx = Long.parseLong(incrementalDump2.lastReplicationId)+1; idx < (lastEventID - 2); idx ++) { + Path eventRoot = new Path(hiveDumpDir, String.valueOf(idx)); + if (fs.exists(eventRoot)) { + for (FileStatus fileStatus: fs.listStatus(eventRoot)) { + pathModTimeMap.put(fileStatus.getPath(), fileStatus.getModificationTime()); + } + } + } + long lastEvtModTimeOld = fs.getFileStatus(lastEvtRoot).getModificationTime(); + long secondLastEvtModTimeOld = fs.getFileStatus(secondLastEvtRoot).getModificationTime(); + long thirdLastEvtModTimeOld = fs.getFileStatus(thirdLastEvtRoot).getModificationTime(); + + fs.delete(lastEvtRoot, true); + fs.delete(secondLastEvtRoot, true); + fs.delete(thirdLastEvtRoot, true); + List> listValues = new ArrayList<>(); + listValues.add( + Arrays.asList( + LAST_EVENT_ID_NAME, + String.valueOf(lastEventID - 3) + ) + ); + org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(listValues, ackLastEventID, primary.hiveConf, true); + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + + WarehouseInstance.Tuple incrementalDump4 = primary.run("use " + primaryDbName) + .dump(primaryDbName); + + assertEquals(incrementalDump3.dumpLocation, incrementalDump4.dumpLocation); + + verifyPathExist(fs, ackFile); + verifyPathExist(fs, ackLastEventID); + verifyPathExist(fs, lastEvtRoot); + verifyPathExist(fs, secondLastEvtRoot); + verifyPathExist(fs, thirdLastEvtRoot); + assertTrue(fs.getFileStatus(lastEvtRoot).getModificationTime() > lastEvtModTimeOld); + assertTrue(fs.getFileStatus(secondLastEvtRoot).getModificationTime() > secondLastEvtModTimeOld); + assertTrue(fs.getFileStatus(thirdLastEvtRoot).getModificationTime() > thirdLastEvtModTimeOld); + + //Check other event dump files have not been modified. + for (Map.Entry entry:pathModTimeMap.entrySet()) { + assertEquals((long)entry.getValue(), fs.getFileStatus(entry.getKey()).getModificationTime()); + } + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "3"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"2", "4"}); + } + + @Test + public void testIncrementalResumeDump() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .dump(primaryDbName); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {}); + + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("insert into t1 values (1)") + .dump(primaryDbName); + + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + Path dumpMetaData = new Path(hiveDumpDir, "_dumpmetadata"); + + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(ackLastEventID)); + assertTrue(fs.exists(dumpMetaData)); + + fs.delete(ackLastEventID, false); + fs.delete(ackFile, false); + //delete only last event root dir + Path lastEventRoot = new Path(hiveDumpDir, String.valueOf(incrementalDump1.lastReplicationId)); + assertTrue(fs.exists(lastEventRoot)); + fs.delete(lastEventRoot, true); + + // It should create a fresh dump dir as _events_dump doesn't exist. + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName); + assertTrue(incrementalDump1.dumpLocation != incrementalDump2.dumpLocation); + assertTrue(incrementalDump1.lastReplicationId != incrementalDump2.lastReplicationId); + assertTrue(fs.getFileStatus(new Path(incrementalDump2.dumpLocation)).getModificationTime() + > fs.getFileStatus(new Path(incrementalDump1.dumpLocation)).getModificationTime()); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}); + } + + @Test + public void testCheckpointingOnFirstEventDump() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .dump(primaryDbName); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {}); + + // Testing a scenario where first event was getting dumped and it failed. + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("insert into t1 values (1)") + .dump(primaryDbName); + + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + Path dumpMetaData = new Path(hiveDumpDir, "_dumpmetadata"); + + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(dumpMetaData)); + + fs.delete(ackFile, false); + fs.delete(ackLastEventID, false); + fs.delete(dumpMetaData, false); + //delete all the event folder except first one. + long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1; + long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId); + assertTrue(lastIncEventID > (firstIncEventID + 1)); + + for (long eventId=firstIncEventID + 1; eventId<=lastIncEventID; eventId++) { + Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId)); + if (fs.exists(eventRoot)) { + fs.delete(eventRoot, true); + } + } + + Path firstIncEventRoot = new Path(hiveDumpDir, String.valueOf(firstIncEventID)); + long firstIncEventModTimeOld = fs.getFileStatus(firstIncEventRoot).getModificationTime(); + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName); + + assertTrue(incrementalDump1.dumpLocation != incrementalDump2.dumpLocation); + hiveDumpDir = new Path(incrementalDump2.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + firstIncEventRoot = new Path(hiveDumpDir, String.valueOf(firstIncEventID)); + assertTrue(fs.exists(ackFile)); + long firstIncEventModTimeNew = fs.getFileStatus(firstIncEventRoot).getModificationTime(); + assertTrue(firstIncEventModTimeNew > firstIncEventModTimeOld); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}); + } + + @Test + public void testCheckpointingIncrWithTableDrop() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("CREATE TABLE t2(a string) STORED AS TEXTFILE") + .run("insert into t2 values (2)") + .dump(primaryDbName); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"2"}); + + + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("CREATE TABLE t3(a string) STORED AS TEXTFILE") + .run("insert into t3 values (3)") + .run("insert into t1 values (4)") + .run("DROP TABLE t2") + .dump(primaryDbName); + + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + Path dumpMetaData = new Path(hiveDumpDir, "_dumpmetadata"); + + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(dumpMetaData)); + + fs.delete(ackFile, false); + fs.delete(dumpMetaData, false); + //delete last five events + long fifthLastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId) - 4; + long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId); + assertTrue(lastIncEventID > fifthLastIncEventID ); + + for (long eventId=fifthLastIncEventID + 1; eventId<=lastIncEventID; eventId++) { + Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId)); + if (fs.exists(eventRoot)) { + fs.delete(eventRoot, true); + } + } + + List> listValues = new ArrayList<>(); + listValues.add( + Arrays.asList( + LAST_EVENT_ID_NAME, + String.valueOf(fifthLastIncEventID) + ) + ); + org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(listValues, ackLastEventID, primary.hiveConf, true); + + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName); + + assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation); + ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + assertTrue(fs.exists(ackFile)); + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "4"}) + .run("select * from " + replicatedDbName + ".t3") + .verifyResults(new String[] {"3"}) + .runFailure("select * from " + replicatedDbName + ".t2"); + } + + @Test + public void testCheckPointingDataDumpFailureBootstrapDuringIncremental() throws Throwable { + List dumpClause = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); + + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("create table t1(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .dump(primaryDbName, dumpClause); + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "2"}); + + dumpClause = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'"); + + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("create table t2(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t2 values (3)") + .run("insert into t2 values (4)") + .run("insert into t2 values (5)") + .dump(primaryDbName, dumpClause); + + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + Path bootstrapDir = new Path(hiveDumpDir, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); + Path metaDir = new Path(bootstrapDir, EximUtil.METADATA_PATH_NAME); + + Path t2dataDir = new Path(bootstrapDir, EximUtil.DATA_PATH_NAME + File.separator + + primaryDbName + File.separator + "t2"); + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + + verifyPathExist(fs, ackFile); + verifyPathExist(fs, ackLastEventID); + + long oldMetadirModTime = fs.getFileStatus(metaDir).getModificationTime(); + long oldT2DatadirModTime = fs.getFileStatus(t2dataDir).getModificationTime(); + + fs.delete(ackFile, false); + + //Do another dump and test the rewrite happended for meta and no write for data folder + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName, dumpClause); + assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation); + assertTrue(fs.exists(ackFile)); + verifyPathExist(fs, ackFile); + verifyPathExist(fs, ackLastEventID); + + long newMetadirModTime = fs.getFileStatus(metaDir).getModificationTime(); + long newT2DatadirModTime = fs.getFileStatus(t2dataDir).getModificationTime(); + + assertTrue(newMetadirModTime > oldMetadirModTime); + assertEquals(oldT2DatadirModTime, newT2DatadirModTime); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"3", "4", "5"}); + } + + @Test + public void testHdfsMaxDirItemsLimitDuringIncremental() throws Throwable { + + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("create table t1(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (1)") + .dump(primaryDbName); + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}); + + List dumpClause = Arrays.asList("'" + ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG + "'='" + + (ReplUtils.RESERVED_DIR_ITEMS_COUNT + 5) +"'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'"); + + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t1 values (4)") + .run("insert into t1 values (5)") + .run("insert into t1 values (6)") + .run("insert into t1 values (7)") + .run("insert into t1 values (8)") + .run("insert into t1 values (9)") + .run("insert into t1 values (10)") + .run("create table t2(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t2 values (100)") + .dump(primaryDbName, dumpClause); + + int eventCount = Integer.parseInt(incrementalDump1.lastReplicationId) + - Integer.parseInt(bootstrapDump.lastReplicationId); + assertEquals(eventCount, 5); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"100"}); + + dumpClause = Arrays.asList("'" + ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG + "'='1000'"); + + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName, dumpClause); + + eventCount = Integer.parseInt(incrementalDump2.lastReplicationId) + - Integer.parseInt(incrementalDump1.lastReplicationId); + assertTrue(eventCount > 5); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}); + } + @Test public void testCheckPointingDataDumpFailure() throws Throwable { //To force distcp copy @@ -692,6 +1146,7 @@ public void testCheckPointingDataDumpFailure() throws Throwable { Path dbDataPath = new Path(dataPath, primaryDbName.toLowerCase()); Path tablet1Path = new Path(dbDataPath, "t1"); Path tablet2Path = new Path(dbDataPath, "t2"); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); //Delete dump ack and t2 data, metadata should be rewritten, data should be same for t1 but rewritten for t2 fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); @@ -965,4 +1420,8 @@ public void testCheckPointingMetadataDumpFailure() throws Throwable { dumpPath = new Path(nextDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); } + + private void verifyPathExist(FileSystem fs, Path filePath) throws IOException { + assertTrue("Path not found:" + filePath, fs.exists(filePath)); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index e1b8b81f49..1adc4fbc1f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -925,7 +925,7 @@ public void testIncrementalDumpMultiIteration() throws Throwable { Path path = new Path(hiveDumpDir); FileSystem fs = path.getFileSystem(conf); FileStatus[] fileStatus = fs.listStatus(path); - int numEvents = fileStatus.length - 2; //one is metadata file and one is _dump ack + int numEvents = fileStatus.length - 3; //for _metadata, _finished_dump and _events_dump replica.load(replicatedDbName, primaryDbName, Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'")) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index b7a9888688..8ceea57b5a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -563,7 +563,7 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { // _bootstrap//t2 // _bootstrap//t3 - Path dbPath = new Path(dumpPath, primaryDbName); + Path dbPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME + File.separator + primaryDbName); Path tblPath = new Path(dbPath, "t2"); assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath)); tblPath = new Path(dbPath, "t3"); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java index 8b2c556ce6..e3e66612ec 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java @@ -502,9 +502,9 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { Path dumpPath = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME); assertTrue(primary.miniDFSCluster.getFileSystem().exists(dumpPath)); - // _bootstrap//t2 - // _bootstrap//t3 - Path dbPath = new Path(dumpPath, primaryDbName); + // _bootstrap/metedata//t2 + // _bootstrap/metedata//t3 + Path dbPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME + File.separator + primaryDbName); Path tblPath = new Path(dbPath, "t2"); assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath)); tblPath = new Path(dbPath, "t3"); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java index 4a64927bad..d99bf54f62 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java @@ -17,11 +17,15 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.exec.repl.ReplAck; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionService; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; @@ -30,6 +34,10 @@ import org.junit.Test; import org.junit.BeforeClass; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.Map; import java.util.HashMap; @@ -38,6 +46,7 @@ * TestScheduledReplicationScenarios - test scheduled replication . */ public class TestScheduledReplicationScenarios extends BaseReplicationScenariosAcidTables { + private static final long DEFAULT_PROBE_TIMEOUT = 2 * 60 * 1000L; // 2 minutes @BeforeClass public static void classLevelSetup() throws Exception { @@ -102,15 +111,19 @@ public void testAcidTablesReplLoadBootstrapIncr() throws Throwable { .run("insert into t1 values(2)"); try (ScheduledQueryExecutionService schqS = ScheduledQueryExecutionService.startScheduledQueryExecutorService(primary.hiveConf)) { - int next = 0; - ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); - primary.run("create scheduled query s1_t1 every 10 minutes as repl dump " + primaryDbName); - primary.run("alter scheduled query s1_t1 execute"); - Thread.sleep(6000); - replica.run("create scheduled query s2_t1 every 10 minutes as repl load " + primaryDbName + " INTO " + int next = -1; + ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next), true); + primary.run("create scheduled query s1_t1 every 5 seconds as repl dump " + primaryDbName); + replica.run("create scheduled query s2_t1 every 5 seconds as repl load " + primaryDbName + " INTO " + replicatedDbName); - replica.run("alter scheduled query s2_t1 execute"); - Thread.sleep(20000); + Path dumpRoot = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR), + Base64.getEncoder().encodeToString(primaryDbName.toLowerCase().getBytes(StandardCharsets.UTF_8.name()))); + FileSystem fs = FileSystem.get(dumpRoot.toUri(), primary.hiveConf); + + next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1; + Path ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR + + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString()); + waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT); replica.run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -121,12 +134,10 @@ public void testAcidTablesReplLoadBootstrapIncr() throws Throwable { primary.run("use " + primaryDbName) .run("insert into t1 values(3)") .run("insert into t1 values(4)"); - next++; - ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); - primary.run("alter scheduled query s1_t1 execute"); - Thread.sleep(20000); - replica.run("alter scheduled query s2_t1 execute"); - Thread.sleep(20000); + next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1; + ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR + + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString()); + waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT); replica.run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -137,12 +148,10 @@ public void testAcidTablesReplLoadBootstrapIncr() throws Throwable { primary.run("use " + primaryDbName) .run("insert into t1 values(5)") .run("insert into t1 values(6)"); - next++; - ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); - primary.run("alter scheduled query s1_t1 execute"); - Thread.sleep(30000); - replica.run("alter scheduled query s2_t1 execute"); - Thread.sleep(30000); + next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1; + ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR + + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString()); + waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT); replica.run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -168,15 +177,18 @@ public void testExternalTablesReplLoadBootstrapIncr() throws Throwable { .run("insert into t2 values(2)"); try (ScheduledQueryExecutionService schqS = ScheduledQueryExecutionService.startScheduledQueryExecutorService(primary.hiveConf)) { - int next = 0; - ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); - primary.run("create scheduled query s1_t2 every 10 minutes as repl dump " + primaryDbName + withClause); - primary.run("alter scheduled query s1_t2 execute"); - Thread.sleep(80000); - replica.run("create scheduled query s2_t2 every 10 minutes as repl load " + primaryDbName + " INTO " + int next = -1; + ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next), true); + primary.run("create scheduled query s1_t2 every 5 seconds as repl dump " + primaryDbName + withClause); + replica.run("create scheduled query s2_t2 every 5 seconds as repl load " + primaryDbName + " INTO " + replicatedDbName); - replica.run("alter scheduled query s2_t2 execute"); - Thread.sleep(80000); + Path dumpRoot = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR), + Base64.getEncoder().encodeToString(primaryDbName.toLowerCase().getBytes(StandardCharsets.UTF_8.name()))); + FileSystem fs = FileSystem.get(dumpRoot.toUri(), primary.hiveConf); + next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1; + Path ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR + + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString()); + waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT); replica.run("use " + replicatedDbName) .run("show tables like 't2'") .verifyResult("t2") @@ -187,22 +199,37 @@ public void testExternalTablesReplLoadBootstrapIncr() throws Throwable { primary.run("use " + primaryDbName) .run("insert into t2 values(3)") .run("insert into t2 values(4)"); - next++; - ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); - primary.run("alter scheduled query s1_t2 execute"); - Thread.sleep(80000); - replica.run("alter scheduled query s2_t2 execute"); - Thread.sleep(80000); + next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1; + ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR + + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString()); + waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT); replica.run("use " + replicatedDbName) .run("show tables like 't2'") .verifyResult("t2") .run("select id from t2 order by id") .verifyResults(new String[]{"1", "2", "3", "4"}); - - } finally { primary.run("drop scheduled query s1_t2"); replica.run("drop scheduled query s2_t2"); } } + private void waitForAck(FileSystem fs, Path ackFile, long timeout) throws IOException { + long oldTime = System.currentTimeMillis(); + long sleepInterval = 2; + + while(true) { + if (fs.exists(ackFile)) { + return; + } + try { + Thread.sleep(sleepInterval); + } catch (InterruptedException e) { + //no-op + } + if (System.currentTimeMillis() - oldTime > timeout) { + break; + } + } + throw new IOException("Timed out waiting for the ack file: " + ackFile.toString()); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index 93e24ef852..5bdf5515e5 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -20,7 +20,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; @@ -242,13 +241,8 @@ private void verifyBootstrapDirInIncrementalDump(String dumpLocation, String[] b Assert.assertTrue(primary.miniDFSCluster.getFileSystem().exists(dumpPath)); // Check if the DB dump path have any tables other than the ones listed in bootstrappedTables. - Path dbPath = new Path(dumpPath, primaryDbName); - FileStatus[] fileStatuses = primary.miniDFSCluster.getFileSystem().listStatus(dbPath, new PathFilter() { - @Override - public boolean accept(Path path) { - return !path.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME); - } - }); + Path dbPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME + File.separator + primaryDbName); + FileStatus[] fileStatuses = primary.miniDFSCluster.getFileSystem().listStatus(dbPath); Assert.assertEquals(fileStatuses.length, bootstrappedTables.length); // Eg: _bootstrap//t2, _bootstrap//t3 etc diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java index db8db5f8e7..d9c873b332 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java @@ -22,6 +22,7 @@ */ public enum ReplAck { DUMP_ACKNOWLEDGEMENT("_finished_dump"), + EVENTS_DUMP("_events_dump"), LOAD_ACKNOWLEDGEMENT("_finished_load"); private String ack; ReplAck(String ack) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 2e0af02094..907bf3949f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; +import org.apache.hadoop.hive.metastore.utils.Retry; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -78,9 +79,12 @@ import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStreamReader; import java.io.Serializable; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Set; import java.util.HashSet; @@ -94,6 +98,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.parse.EximUtil.LAST_EVENT_ID_NAME; public class ReplDumpTask extends Task implements Serializable { private static final long serialVersionUID = 1L; @@ -137,16 +142,17 @@ public int execute() { Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() .getBytes(StandardCharsets.UTF_8.name()))); Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot); + boolean isBootstrap = (previousValidHiveDumpPath == null); //If no previous dump is present or previous dump is already loaded, proceed with the dump operation. if (shouldDump(previousValidHiveDumpPath)) { - Path currentDumpPath = getCurrentDumpPath(dumpRoot); + Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap); Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); // Initialize ReplChangeManager instance since we will require it to encode file URI. ReplChangeManager.getInstance(conf); Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); Long lastReplId; - if (previousValidHiveDumpPath == null) { + if (isBootstrap) { lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); } else { work.setEventFrom(getEventFromPreviousDumpMetadata(previousValidHiveDumpPath)); @@ -167,11 +173,12 @@ public int execute() { return 0; } - private Path getCurrentDumpPath(Path dumpRoot) throws IOException { - Path previousDumpPath = getPreviousDumpPath(dumpRoot); - if (previousDumpPath != null && !validDump(previousDumpPath) && shouldResumePreviousDump(previousDumpPath)) { + private Path getCurrentDumpPath(Path dumpRoot, boolean isBootstrap) throws IOException { + Path lastDumpPath = getLatestDumpPath(dumpRoot); + if (lastDumpPath != null && shouldResumePreviousDump(lastDumpPath, isBootstrap)) { //Resume previous dump - return previousDumpPath; + LOG.info("Resuming the dump with existing dump directory {}", lastDumpPath); + return lastDumpPath; } else { return new Path(dumpRoot, getNextDumpDir()); } @@ -225,7 +232,7 @@ private void deleteAllPreviousDumpMeta(Path currentDumpPath) { } private Path getDumpRoot(Path currentDumpPath) { - if (ReplDumpWork.testDeletePreviousDumpMetaPath && conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { + if (ReplDumpWork.testDeletePreviousDumpMetaPath && conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) { //testDeleteDumpMetaDumpPath to be used only for test. return null; } else { @@ -425,11 +432,16 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive EventUtils.MSClientNotificationFetcher evFetcher = new EventUtils.MSClientNotificationFetcher(hiveDb); + + int maxEventLimit = getMaxEventAllowed(work.maxEventLimit()); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( - evFetcher, work.eventFrom, work.maxEventLimit(), evFilter); + evFetcher, work.eventFrom, maxEventLimit, evFilter); lastReplId = work.eventTo; + Path ackFile = new Path(dumpRoot, ReplAck.EVENTS_DUMP.toString()); + Long resumeFrom = Utils.fileExists(ackFile, conf) ? getResumeFrom(ackFile, work.eventFrom) : work.eventFrom; + // Right now the only pattern allowed to be specified is *, which matches all the database // names. So passing dbname as is works since getDbNotificationEventsCount can exclude filter // on database name when it's *. In future, if we support more elaborate patterns, we will @@ -438,16 +450,24 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty()) ? work.dbNameOrPattern : "?"; - int maxEventLimit = work.maxEventLimit(); replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(), evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo, maxEventLimit), work.eventFrom, work.eventTo, maxEventLimit); replLogger.startLog(); + Long dumpedCount = resumeFrom - work.eventFrom; + if (dumpedCount > 0) { + LOG.info("Event id {} to {} are already dumped, skipping {} events", work.eventFrom, resumeFrom, dumpedCount); + } + cleanFailedEventDirIfExists(dumpRoot, resumeFrom); while (evIter.hasNext()) { NotificationEvent ev = evIter.next(); lastReplId = ev.getEventId(); Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); + if (ev.getEventId() <= resumeFrom) { + continue; + } dumpEvent(ev, evRoot, dumpRoot, cmRoot, hiveDb); + updateLastEventDumped(ackFile, lastReplId); } replLogger.endLog(lastReplId.toString()); @@ -460,7 +480,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive if (work.oldReplScope != null) { dmd.setReplScope(work.replScope); } - dmd.write(); + dmd.write(true); // Examine all the tables if required. if (shouldExamineTablesToDump() || (tableList != null)) { @@ -470,8 +490,18 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); } + /* When same dump dir is resumed because of check-pointing, we need to clear the existing metadata. + We need to rewrite the metadata as the write id list will be changed. + We can't reuse the previous write id as it might be invalid due to compaction. */ + Path bootstrapRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); + Path metadataPath = new Path(bootstrapRoot, EximUtil.METADATA_PATH_NAME); + FileSystem fs = FileSystem.get(metadataPath.toUri(), conf); + if (fs.exists(metadataPath)) { + fs.delete(metadataPath, true); + } + Path dbRootMetadata = new Path(metadataPath, dbName); + Path dbRootData = new Path(bootstrapRoot, EximUtil.DATA_PATH_NAME + File.separator + dbName); managedTableCopyPaths = new ArrayList<>(); - Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true); List extTableLocations = new LinkedList<>(); try (Writer writer = new Writer(dumpRoot, conf)) { for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { @@ -487,10 +517,8 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive // Dump the table to be bootstrapped if required. if (shouldBootstrapDumpTable(table)) { HiveWrapper.Tuple tableTuple = new HiveWrapper(hiveDb, dbName).table(table); - Path dbDataRoot = new Path(dbRoot, EximUtil.DATA_PATH_NAME); managedTableCopyPaths.addAll( - dumpTable(dbName, tableName, validTxnList, - dbRoot, dbDataRoot, bootDumpBeginReplId, + dumpTable(dbName, tableName, validTxnList, dbRootMetadata, dbRootData, bootDumpBeginReplId, hiveDb, tableTuple)); } if (tableList != null && isTableSatifiesConfig(table)) { @@ -511,6 +539,77 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive return lastReplId; } + private int getMaxEventAllowed(int currentEventMaxLimit) { + int maxDirItems = Integer.parseInt(conf.get(ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG, "0")); + if (maxDirItems > 0) { + maxDirItems = maxDirItems - ReplUtils.RESERVED_DIR_ITEMS_COUNT; + if (maxDirItems < currentEventMaxLimit) { + LOG.warn("Changing the maxEventLimit from {} to {} as the '" + ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG + + "' limit encountered. Set this config appropriately to increase the maxEventLimit", + currentEventMaxLimit, maxDirItems); + currentEventMaxLimit = maxDirItems; + } + } + return currentEventMaxLimit; + } + + private void cleanFailedEventDirIfExists(Path dumpDir, Long resumeFrom) throws IOException { + Path nextEventRoot = new Path(dumpDir, String.valueOf(resumeFrom + 1)); + Retry retriable = new Retry(IOException.class) { + @Override + public Void execute() throws IOException { + FileSystem fs = FileSystem.get(nextEventRoot.toUri(), conf); + if (fs.exists(nextEventRoot)) { + fs.delete(nextEventRoot, true); + } + return null; + } + }; + try { + retriable.run(); + } catch (Exception e) { + throw new IOException(e); + } + } + + public void updateLastEventDumped(Path ackFile, Long lastReplId) throws SemanticException { + List> listValues = new ArrayList<>(); + listValues.add( + Arrays.asList( + LAST_EVENT_ID_NAME, + String.valueOf(lastReplId) + ) + ); + Utils.writeOutput(listValues, ackFile, conf, true); + } + + private long getResumeFrom(Path ackFile, long defID) throws SemanticException { + long lastEventID = defID; + BufferedReader br = null; + try { + FileSystem fs = ackFile.getFileSystem(conf); + br = new BufferedReader(new InputStreamReader(fs.open(ackFile), Charset.defaultCharset())); + String line = br.readLine(); + if (line != null) { + String[] lineContents = line.split("\t", 5); + lastEventID = Long.parseLong(lineContents[1]); + } else { + LOG.warn("Unable to read lastEventID from ackFile:{}, defaulting to:{}", ackFile.toUri().toString(), defID); + } + } catch (IOException ioe) { + throw new SemanticException(ioe); + } finally { + if (br != null) { + try { + br.close(); + } catch (IOException e) { + throw new SemanticException(e); + } + } + } + return lastEventID; + } + private boolean needBootstrapAcidTablesDuringIncrementalDump() { // If acid table dump is not enabled, then no neeed to check further. if (!ReplUtils.includeAcidTableInDump(conf)) { @@ -526,13 +625,6 @@ private boolean needBootstrapAcidTablesDuringIncrementalDump() { || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)); } - private Path getBootstrapDbRoot(Path dumpRoot, String dbName, boolean isIncrementalPhase) { - if (isIncrementalPhase) { - dumpRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); - } - return new Path(dumpRoot, dbName); - } - private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot, Path cmRoot, Hive db) throws Exception { EventHandler.Context context = new EventHandler.Context( evRoot, @@ -715,7 +807,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) LOG.info("Preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); - dmd.write(); + dmd.write(true); work.setDirCopyIterator(extTableCopyWorks.iterator()); work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); @@ -731,9 +823,17 @@ private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) { } } - private boolean shouldResumePreviousDump(Path dumpPath) { - Path hiveDumpPath = new Path(dumpPath, ReplUtils.REPL_HIVE_BASE_DIR); - return shouldResumePreviousDump(new DumpMetaData(hiveDumpPath, conf)); + private boolean shouldResumePreviousDump(Path lastDumpPath, boolean isBootStrap) throws IOException { + if (validDump(lastDumpPath)) { + return false; + } + Path hiveDumpPath = new Path(lastDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); + if (isBootStrap) { + return shouldResumePreviousDump(new DumpMetaData(hiveDumpPath, conf)); + } + // In case of incremental we should resume if _events_dump file is present + Path lastEventFile = new Path(hiveDumpPath, ReplAck.EVENTS_DUMP.toString()); + return FileSystem.get(lastEventFile.toUri(), conf).exists(lastEventFile); } long currentNotificationId(Hive hiveDb) throws TException { @@ -742,7 +842,7 @@ long currentNotificationId(Hive hiveDb) throws TException { Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hiveDb) throws Exception { // TODO : instantiating FS objects are generally costly. Refactor - Path dbRoot = getBootstrapDbRoot(metadataRoot, dbName, false); + Path dbRoot = new Path(metadataRoot, dbName); FileSystem fs = dbRoot.getFileSystem(conf); Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); HiveWrapper.Tuple database = new HiveWrapper(hiveDb, dbName, lastReplId).database(); @@ -854,10 +954,11 @@ private String getNextDumpDir() { // make it easy to write .q unit tests, instead of unique id generation. // however, this does mean that in writing tests, we have to be aware that // repl dump will clash with prior dumps, and thus have to clean up properly. - if (ReplDumpWork.testInjectDumpDir == null) { + String nextDump = ReplDumpWork.getInjectNextDumpDirForTest(); + if (nextDump == null) { return "next"; } else { - return ReplDumpWork.testInjectDumpDir; + return nextDump; } } else { return UUID.randomUUID().toString(); @@ -867,7 +968,7 @@ private String getNextDumpDir() { } } - private Path getPreviousDumpPath(Path dumpRoot) throws IOException { + private Path getLatestDumpPath(Path dumpRoot) throws IOException { FileSystem fs = dumpRoot.getFileSystem(conf); if (fs.exists(dumpRoot)) { FileStatus[] statuses = fs.listStatus(dumpRoot); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 1f0d70212c..0e50ec61a4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -47,7 +47,8 @@ final String dbNameOrPattern, astRepresentationForErrorMsg, resultTempPath; Long eventTo; Long eventFrom; - static String testInjectDumpDir = null; + private static String testInjectDumpDir = null; + private static boolean testInjectDumpDirAutoIncrement = false; static boolean testDeletePreviousDumpMetaPath = false; private Integer maxEventLimit; private transient Iterator dirCopyIterator; @@ -56,7 +57,22 @@ private List resultValues; public static void injectNextDumpDirForTest(String dumpDir) { + injectNextDumpDirForTest(dumpDir, false); + } + public static void injectNextDumpDirForTest(String dumpDir, boolean autoIncr) { testInjectDumpDir = dumpDir; + testInjectDumpDirAutoIncrement = autoIncr; + } + + public static synchronized String getTestInjectDumpDir() { + return testInjectDumpDir; + } + + public static synchronized String getInjectNextDumpDirForTest() { + if (testInjectDumpDirAutoIncrement) { + testInjectDumpDir = String.valueOf(Integer.parseInt(testInjectDumpDir) + 1); + } + return testInjectDumpDir; } public static void testDeletePreviousDumpMetaPath(boolean failDeleteDumpMeta) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 56efa32cb6..f072effc31 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -84,8 +84,8 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, Path incBootstrapDir = new Path(dumpDirectory, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); FileSystem fs = incBootstrapDir.getFileSystem(hiveConf); if (fs.exists(incBootstrapDir)) { - this.bootstrapIterator = new BootstrapEventsIterator(incBootstrapDir.toString(), dbNameToLoadIn, - true, hiveConf); + this.bootstrapIterator = new BootstrapEventsIterator( + new Path(incBootstrapDir, EximUtil.METADATA_PATH_NAME).toString(), dbNameToLoadIn, true, hiveConf); this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); } else { this.bootstrapIterator = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java index a311f7ae22..4cf316a42c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java @@ -180,9 +180,6 @@ public BootstrapEvent next() { } private Path getDbLevelDataPath() { - if (dbLevelPath.toString().contains(Path.SEPARATOR + ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME + Path.SEPARATOR)) { - return new Path(dbLevelPath, EximUtil.DATA_PATH_NAME); - } return new Path(new Path(dbLevelPath.getParent().getParent(), EximUtil.DATA_PATH_NAME), dbLevelPath.getName()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 939cbc3a35..4fcee0e34a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -94,6 +94,12 @@ // Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be // seen in production or in case of tests other than the ones where it's required. public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables"; + + // HDFS Config to define the maximum number of items a directory may contain. + public static final String DFS_MAX_DIR_ITEMS_CONFIG = "dfs.namenode.fs-limits.max-directory-items"; + + // Reserved number of items to accommodate operational files in the dump root dir. + public static final int RESERVED_DIR_ITEMS_COUNT = 10; /** * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. */ @@ -236,7 +242,6 @@ public static PathFilter getEventsDirectoryFilter(final FileSystem fs) { try { return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) - && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME) && !p.getName().equalsIgnoreCase(EximUtil.METADATA_PATH_NAME); } catch (IOException e) { throw new RuntimeException(e); @@ -248,7 +253,6 @@ public static PathFilter getBootstrapDirectoryFilter(final FileSystem fs) { return p -> { try { return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) - && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME) && !p.getName().equalsIgnoreCase(EximUtil.METADATA_PATH_NAME); } catch (IOException e) { throw new RuntimeException(e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index fb6a38cd43..5ae1e789ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -76,6 +76,7 @@ public static final String FILES_NAME = "_files"; public static final String DATA_PATH_NAME = "data"; public static final String METADATA_PATH_NAME = "metadata"; + public static final String LAST_EVENT_ID_NAME = "lastEventId"; private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 73dc606d87..3de583276e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -114,7 +114,7 @@ String threadName = Thread.currentThread().getName(); LOG.debug("Thread: {}, start partition dump {}", threadName, partitionName); try { - // this the data copy + // Data Copy in case of ExportTask List dataPathList = Utils.getDataPathList(partition.getDataLocation(), forReplicationSpec, hiveConf); Path rootDataDumpDir = paths.partitionMetadataExportDir(partitionName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index bdf09ac92b..b85ca507d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -64,22 +64,36 @@ public static void writeOutput(List> listValues, Path outputFile, HiveConf hiveConf) throws SemanticException { - DataOutputStream outStream = null; - try { - FileSystem fs = outputFile.getFileSystem(hiveConf); - outStream = fs.create(outputFile); - for (List values : listValues) { - outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0))); - for (int i = 1; i < values.size(); i++) { - outStream.write(Utilities.tabCode); - outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i))); + writeOutput(listValues, outputFile, hiveConf, false); + } + + public static void writeOutput(List> listValues, Path outputFile, HiveConf hiveConf, boolean update) + throws SemanticException { + Retry retriable = new Retry(IOException.class) { + @Override + public Void execute() throws IOException { + DataOutputStream outStream = null; + try { + FileSystem fs = outputFile.getFileSystem(hiveConf); + outStream = fs.create(outputFile, update); + for (List values : listValues) { + outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0))); + for (int i = 1; i < values.size(); i++) { + outStream.write(Utilities.tabCode); + outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i))); + } + outStream.write(Utilities.newLineCode); + } + } finally { + IOUtils.closeStream(outStream); } - outStream.write(Utilities.newLineCode); + return null; } - } catch (IOException e) { + }; + try { + retriable.run(); + } catch (Exception e) { throw new SemanticException(e); - } finally { - IOUtils.closeStream(outStream); } } @@ -100,6 +114,14 @@ public Void execute() throws IOException { } } + public static boolean fileExists(Path filePath, HiveConf hiveConf) throws IOException { + FileSystem fs = filePath.getFileSystem(hiveConf); + if (fs.exists(filePath)) { + return true; + } + return false; + } + public static Iterable matchesDb(Hive db, String dbPattern) throws HiveException { if (dbPattern == null) { return db.getAllDatabases(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java index f275194b6d..e538c79f34 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java @@ -196,6 +196,10 @@ private void initializeIfNot() throws SemanticException { } public void write() throws SemanticException { + write(false); + } + + public void write(boolean replace) throws SemanticException { List> listValues = new ArrayList<>(); listValues.add( Arrays.asList( @@ -208,7 +212,6 @@ public void write() throws SemanticException { if (replScope != null) { listValues.add(prepareReplScopeValues()); } - Utils.writeOutput(listValues, dumpFile, hiveConf - ); + Utils.writeOutput(listValues, dumpFile, hiveConf, replace); } }