diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 39722a0dd2..0d1a18737e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -33,6 +34,8 @@ import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.exec.repl.ReplAck; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.session.SessionState; @@ -47,6 +50,8 @@ import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -663,6 +668,529 @@ public void testMultiDBTxn() throws Throwable { } } + @Test + public void testIncrementalDumpCheckpointing() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("CREATE TABLE t2(a string) STORED AS TEXTFILE") + .dump(primaryDbName); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {}); + + + //Case 1: When the last dump finished all the events and + //only _finished_dump file at the hiveDumpRoot was about to be written when it failed. + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("insert into t1 values (1)") + .run("insert into t2 values (2)") + .dump(primaryDbName); + + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(ackLastEventID)); + + fs.delete(ackFile, false); + + long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1; + long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId); + assertTrue(lastIncEventID > (firstIncEventID + 1)); + Map pathModTimeMap = new HashMap<>(); + for (long eventId=firstIncEventID; eventId<=lastIncEventID; eventId++) { + Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId)); + if (fs.exists(eventRoot)) { + for (FileStatus fileStatus: fs.listStatus(eventRoot)) { + pathModTimeMap.put(fileStatus.getPath(), fileStatus.getModificationTime()); + } + } + } + + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName); + assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation); + assertTrue(fs.exists(ackFile)); + //check events were not rewritten. + for(Map.Entry entry :pathModTimeMap.entrySet()) { + assertEquals((long)entry.getValue(), + fs.getFileStatus(new Path(hiveDumpDir, entry.getKey())).getModificationTime()); + } + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"2"}); + + + //Case 2: When the last dump was half way through + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump3 = primary.run("use " + primaryDbName) + .run("insert into t1 values (3)") + .run("insert into t2 values (4)") + .dump(primaryDbName); + + hiveDumpDir = new Path(incrementalDump3.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(ackLastEventID)); + + fs.delete(ackFile, false); + //delete last three events and test if it recovers. + long lastEventID = Long.parseLong(incrementalDump3.lastReplicationId); + Path lastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID)); + Path secondLastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID - 1)); + Path thirdLastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID - 2)); + assertTrue(fs.exists(lastEvtRoot)); + assertTrue(fs.exists(secondLastEvtRoot)); + assertTrue(fs.exists(thirdLastEvtRoot)); + + pathModTimeMap = new HashMap<>(); + for (long idx = Long.parseLong(incrementalDump2.lastReplicationId)+1; idx < (lastEventID - 2); idx++) { + Path eventRoot = new Path(hiveDumpDir, String.valueOf(idx)); + if (fs.exists(eventRoot)) { + for (FileStatus fileStatus: fs.listStatus(eventRoot)) { + pathModTimeMap.put(fileStatus.getPath(), fileStatus.getModificationTime()); + } + } + } + long lastEvtModTimeOld = fs.getFileStatus(lastEvtRoot).getModificationTime(); + long secondLastEvtModTimeOld = fs.getFileStatus(secondLastEvtRoot).getModificationTime(); + long thirdLastEvtModTimeOld = fs.getFileStatus(thirdLastEvtRoot).getModificationTime(); + + fs.delete(lastEvtRoot, true); + fs.delete(secondLastEvtRoot, true); + fs.delete(thirdLastEvtRoot, true); + org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(String.valueOf(lastEventID - 3), ackLastEventID, + primary.hiveConf); + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + + WarehouseInstance.Tuple incrementalDump4 = primary.run("use " + primaryDbName) + .dump(primaryDbName); + + assertEquals(incrementalDump3.dumpLocation, incrementalDump4.dumpLocation); + + verifyPathExist(fs, ackFile); + verifyPathExist(fs, ackLastEventID); + verifyPathExist(fs, lastEvtRoot); + verifyPathExist(fs, secondLastEvtRoot); + verifyPathExist(fs, thirdLastEvtRoot); + assertTrue(fs.getFileStatus(lastEvtRoot).getModificationTime() > lastEvtModTimeOld); + assertTrue(fs.getFileStatus(secondLastEvtRoot).getModificationTime() > secondLastEvtModTimeOld); + assertTrue(fs.getFileStatus(thirdLastEvtRoot).getModificationTime() > thirdLastEvtModTimeOld); + + //Check other event dump files have not been modified. + for (Map.Entry entry:pathModTimeMap.entrySet()) { + assertEquals((long)entry.getValue(), fs.getFileStatus(entry.getKey()).getModificationTime()); + } + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "3"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"2", "4"}); + } + + @Test + public void testIncrementalResumeDump() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .dump(primaryDbName); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {}); + + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("insert into t1 values (1)") + .dump(primaryDbName); + + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + Path dumpMetaData = new Path(hiveDumpDir, "_dumpmetadata"); + + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(ackLastEventID)); + assertTrue(fs.exists(dumpMetaData)); + + fs.delete(ackLastEventID, false); + fs.delete(ackFile, false); + //delete only last event root dir + Path lastEventRoot = new Path(hiveDumpDir, String.valueOf(incrementalDump1.lastReplicationId)); + assertTrue(fs.exists(lastEventRoot)); + fs.delete(lastEventRoot, true); + + // It should create a fresh dump dir as _events_dump doesn't exist. + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName); + assertTrue(incrementalDump1.dumpLocation != incrementalDump2.dumpLocation); + assertTrue(incrementalDump1.lastReplicationId != incrementalDump2.lastReplicationId); + assertTrue(fs.getFileStatus(new Path(incrementalDump2.dumpLocation)).getModificationTime() + > fs.getFileStatus(new Path(incrementalDump1.dumpLocation)).getModificationTime()); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}); + } + + @Test + public void testIncrementalResumeDumpFromInvalidEventDumpFile() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .dump(primaryDbName); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {}); + + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("insert into t1 values (1)") + .dump(primaryDbName); + + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + Path dumpMetaData = new Path(hiveDumpDir, "_dumpmetadata"); + + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(ackLastEventID)); + assertTrue(fs.exists(dumpMetaData)); + + fs.delete(ackFile, false); + fs.delete(ackLastEventID, false); + //Case 1: File exists and it has some invalid content + FSDataOutputStream os = fs.create(ackLastEventID); + os.write("InvalidContent".getBytes()); + os.close(); + assertEquals("InvalidContent".length(), fs.getFileStatus(ackLastEventID).getLen()); + //delete only last event root dir + Path lastEventRoot = new Path(hiveDumpDir, String.valueOf(incrementalDump1.lastReplicationId)); + assertTrue(fs.exists(lastEventRoot)); + fs.delete(lastEventRoot, true); + + // It should create a fresh dump dir as _events_dump has some invalid content. + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName); + assertTrue(incrementalDump1.dumpLocation != incrementalDump2.dumpLocation); + assertTrue(fs.getFileStatus(new Path(incrementalDump2.dumpLocation)).getModificationTime() + > fs.getFileStatus(new Path(incrementalDump1.dumpLocation)).getModificationTime()); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}); + + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + //Case 2: File exists and it has no content + WarehouseInstance.Tuple incrementalDump3 = primary.run("use " + primaryDbName) + .run("insert into t1 values (2)") + .dump(primaryDbName); + hiveDumpDir = new Path(incrementalDump3.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + dumpMetaData = new Path(hiveDumpDir, "_dumpmetadata"); + + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(ackLastEventID)); + assertTrue(fs.exists(dumpMetaData)); + + fs.delete(ackFile, false); + fs.delete(ackLastEventID, false); + os = fs.create(ackLastEventID); + os.write("".getBytes()); + os.close(); + assertEquals(0, fs.getFileStatus(ackLastEventID).getLen()); + //delete only last event root dir + lastEventRoot = new Path(hiveDumpDir, String.valueOf(incrementalDump3.lastReplicationId)); + assertTrue(fs.exists(lastEventRoot)); + fs.delete(lastEventRoot, true); + + // It should create a fresh dump dir as _events_dump is empty. + WarehouseInstance.Tuple incrementalDump4 = primary.run("use " + primaryDbName) + .dump(primaryDbName); + assertTrue(incrementalDump3.dumpLocation != incrementalDump4.dumpLocation); + assertTrue(fs.getFileStatus(new Path(incrementalDump4.dumpLocation)).getModificationTime() + > fs.getFileStatus(new Path(incrementalDump3.dumpLocation)).getModificationTime()); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "2"}); + } + + @Test + public void testCheckpointingOnFirstEventDump() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .dump(primaryDbName); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {}); + + // Testing a scenario where first event was getting dumped and it failed. + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("insert into t1 values (1)") + .dump(primaryDbName); + + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + Path dumpMetaData = new Path(hiveDumpDir, "_dumpmetadata"); + + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(dumpMetaData)); + + fs.delete(ackFile, false); + fs.delete(ackLastEventID, false); + fs.delete(dumpMetaData, false); + //delete all the event folder except first one. + long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1; + long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId); + assertTrue(lastIncEventID > (firstIncEventID + 1)); + + for (long eventId=firstIncEventID + 1; eventId<=lastIncEventID; eventId++) { + Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId)); + if (fs.exists(eventRoot)) { + fs.delete(eventRoot, true); + } + } + + Path firstIncEventRoot = new Path(hiveDumpDir, String.valueOf(firstIncEventID)); + long firstIncEventModTimeOld = fs.getFileStatus(firstIncEventRoot).getModificationTime(); + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName); + + assertTrue(incrementalDump1.dumpLocation != incrementalDump2.dumpLocation); + hiveDumpDir = new Path(incrementalDump2.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + firstIncEventRoot = new Path(hiveDumpDir, String.valueOf(firstIncEventID)); + assertTrue(fs.exists(ackFile)); + long firstIncEventModTimeNew = fs.getFileStatus(firstIncEventRoot).getModificationTime(); + assertTrue(firstIncEventModTimeNew > firstIncEventModTimeOld); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}); + } + + @Test + public void testCheckpointingIncrWithTableDrop() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("CREATE TABLE t2(a string) STORED AS TEXTFILE") + .run("insert into t2 values (2)") + .dump(primaryDbName); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"2"}); + + + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("CREATE TABLE t3(a string) STORED AS TEXTFILE") + .run("insert into t3 values (3)") + .run("insert into t1 values (4)") + .run("DROP TABLE t2") + .dump(primaryDbName); + + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + Path dumpMetaData = new Path(hiveDumpDir, "_dumpmetadata"); + + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(dumpMetaData)); + + fs.delete(ackFile, false); + fs.delete(dumpMetaData, false); + //delete last five events + long fifthLastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId) - 4; + long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId); + assertTrue(lastIncEventID > fifthLastIncEventID); + + for (long eventId=fifthLastIncEventID + 1; eventId<=lastIncEventID; eventId++) { + Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId)); + if (fs.exists(eventRoot)) { + fs.delete(eventRoot, true); + } + } + + org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(String.valueOf(fifthLastIncEventID), ackLastEventID, + primary.hiveConf); + + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName); + + assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation); + ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + assertTrue(fs.exists(ackFile)); + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "4"}) + .run("select * from " + replicatedDbName + ".t3") + .verifyResults(new String[] {"3"}) + .runFailure("select * from " + replicatedDbName + ".t2"); + } + + @Test + public void testCheckPointingDataDumpFailureBootstrapDuringIncremental() throws Throwable { + List dumpClause = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); + + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("create table t1(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .dump(primaryDbName, dumpClause); + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "2"}); + + dumpClause = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'"); + + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("create table t2(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t2 values (3)") + .run("insert into t2 values (4)") + .run("insert into t2 values (5)") + .dump(primaryDbName, dumpClause); + + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + Path bootstrapDir = new Path(hiveDumpDir, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); + Path metaDir = new Path(bootstrapDir, EximUtil.METADATA_PATH_NAME); + + Path t2dataDir = new Path(bootstrapDir, EximUtil.DATA_PATH_NAME + File.separator + + primaryDbName + File.separator + "t2"); + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + + verifyPathExist(fs, ackFile); + verifyPathExist(fs, ackLastEventID); + + long oldMetadirModTime = fs.getFileStatus(metaDir).getModificationTime(); + long oldT2DatadirModTime = fs.getFileStatus(t2dataDir).getModificationTime(); + + fs.delete(ackFile, false); + + //Do another dump and test the rewrite happended for meta and no write for data folder + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName, dumpClause); + assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation); + assertTrue(fs.exists(ackFile)); + verifyPathExist(fs, ackFile); + verifyPathExist(fs, ackLastEventID); + + long newMetadirModTime = fs.getFileStatus(metaDir).getModificationTime(); + long newT2DatadirModTime = fs.getFileStatus(t2dataDir).getModificationTime(); + + assertTrue(newMetadirModTime > oldMetadirModTime); + assertEquals(oldT2DatadirModTime, newT2DatadirModTime); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"3", "4", "5"}); + } + + @Test + public void testHdfsMaxDirItemsLimitDuringIncremental() throws Throwable { + + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("create table t1(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (1)") + .dump(primaryDbName); + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}); + + List dumpClause = Arrays.asList("'" + ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG + "'='" + + (ReplUtils.RESERVED_DIR_ITEMS_COUNT + 5) +"'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'"); + + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t1 values (4)") + .run("insert into t1 values (5)") + .run("insert into t1 values (6)") + .run("insert into t1 values (7)") + .run("insert into t1 values (8)") + .run("insert into t1 values (9)") + .run("insert into t1 values (10)") + .run("create table t2(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t2 values (100)") + .dump(primaryDbName, dumpClause); + + int eventCount = Integer.parseInt(incrementalDump1.lastReplicationId) + - Integer.parseInt(bootstrapDump.lastReplicationId); + assertEquals(eventCount, 5); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"100"}); + + dumpClause = Arrays.asList("'" + ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG + "'='1000'"); + + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName, dumpClause); + + eventCount = Integer.parseInt(incrementalDump2.lastReplicationId) + - Integer.parseInt(incrementalDump1.lastReplicationId); + assertTrue(eventCount > 5); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}); + } + @Test public void testCheckPointingDataDumpFailure() throws Throwable { //To force distcp copy @@ -692,6 +1220,7 @@ public void testCheckPointingDataDumpFailure() throws Throwable { Path dbDataPath = new Path(dataPath, primaryDbName.toLowerCase()); Path tablet1Path = new Path(dbDataPath, "t1"); Path tablet2Path = new Path(dbDataPath, "t2"); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); //Delete dump ack and t2 data, metadata should be rewritten, data should be same for t1 but rewritten for t2 fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); @@ -965,4 +1494,8 @@ public void testCheckPointingMetadataDumpFailure() throws Throwable { dumpPath = new Path(nextDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); } + + private void verifyPathExist(FileSystem fs, Path filePath) throws IOException { + assertTrue("Path not found:" + filePath, fs.exists(filePath)); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index e1b8b81f49..1adc4fbc1f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -925,7 +925,7 @@ public void testIncrementalDumpMultiIteration() throws Throwable { Path path = new Path(hiveDumpDir); FileSystem fs = path.getFileSystem(conf); FileStatus[] fileStatus = fs.listStatus(path); - int numEvents = fileStatus.length - 2; //one is metadata file and one is _dump ack + int numEvents = fileStatus.length - 3; //for _metadata, _finished_dump and _events_dump replica.load(replicatedDbName, primaryDbName, Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'")) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index b7a9888688..b078ea1c58 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.parse; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -28,11 +30,14 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.exec.repl.ReplAck; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.security.UserGroupInformation; import org.junit.After; import org.junit.Assert; @@ -47,6 +52,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -406,6 +412,95 @@ public void externalTableWithPartitions() throws Throwable { assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2"); } + @Test + public void externalTableIncrementalCheckpointing() throws Throwable { + List withClause = externalTableBasePathWithClause(); + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .run("create external table t2 (id int)") + .run("insert into table t2 values (3)") + .run("insert into table t2 values (4)") + .dump(primaryDbName, withClause); + + assertExternalFileInfo(Arrays.asList(new String[]{"t1", "t2"}), tuple.dumpLocation, primaryDbName, false); + + replica.load(replicatedDbName, primaryDbName) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("select * from t1") + .verifyResults(new String[] {"1", "2"}) + .run("select * from t2") + .verifyResults(new String[] {"3", "4"}) + .verifyReplTargetProperty(replicatedDbName); + + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + withClause = externalTableWithClause(true, true); + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("drop table t1") + .run("insert into table t2 values (5)") + .run("insert into table t2 values (6)") + .run("create external table t3 (id int)") + .run("insert into table t3 values (8)") + .dump(primaryDbName, withClause); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t2", "t3"), incrementalDump1.dumpLocation, true); + + FileSystem fs = primary.miniDFSCluster.getFileSystem(); + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(ackLastEventID)); + Path bootstrapDir = new Path(hiveDumpDir, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); + Path metaDir = new Path(bootstrapDir, EximUtil.METADATA_PATH_NAME); + Path dataDir = new Path(bootstrapDir, EximUtil.DATA_PATH_NAME); + assertFalse(fs.exists(dataDir)); + long oldMetadirModTime = fs.getFileStatus(metaDir).getModificationTime(); + fs.delete(ackFile, false); + fs.delete(ackLastEventID, false); + //delete all the event folders except first event + long startEvent = Long.valueOf(tuple.lastReplicationId) + 1; + Path startEventRoot = new Path(hiveDumpDir, String.valueOf(startEvent)); + Map firstEventModTimeMap = new HashMap<>(); + for (FileStatus fileStatus: fs.listStatus(startEventRoot)) { + firstEventModTimeMap.put(fileStatus.getPath(), fileStatus.getModificationTime()); + } + long endEvent = Long.valueOf(incrementalDump1.lastReplicationId); + assertTrue(endEvent - startEvent > 1); + for (long eventDir = startEvent + 1; eventDir <= endEvent; eventDir++) { + Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventDir)); + if (fs.exists(eventRoot)) { + fs.delete(eventRoot, true); + } + } + Utils.writeOutput(String.valueOf(startEvent), ackLastEventID, primary.hiveConf); + WarehouseInstance.Tuple incrementalDump2 = primary.dump(primaryDbName, withClause); + assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation); + assertTrue(fs.getFileStatus(metaDir).getModificationTime() > oldMetadirModTime); + assertExternalFileInfo(Arrays.asList("t2", "t3"), incrementalDump2.dumpLocation, true); + //first event meta is not rewritten + for (Map.Entry entry: firstEventModTimeMap.entrySet()) { + assertEquals((long)entry.getValue(), fs.getFileStatus(entry.getKey()).getModificationTime()); + } + replica.load(replicatedDbName, primaryDbName) + .status(replicatedDbName) + .verifyResult(incrementalDump2.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[] {"t1"}) + .run("select * from t2") + .verifyResults(new String[] {"3", "4", "5", "6"}) + .run("select * from t3") + .verifyResult("8") + .verifyReplTargetProperty(replicatedDbName); + } + @Test public void externalTableIncrementalReplication() throws Throwable { List withClause = externalTableBasePathWithClause(); @@ -563,7 +658,7 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { // _bootstrap//t2 // _bootstrap//t3 - Path dbPath = new Path(dumpPath, primaryDbName); + Path dbPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME + File.separator + primaryDbName); Path tblPath = new Path(dbPath, "t2"); assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath)); tblPath = new Path(dbPath, "t3"); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java index 8b2c556ce6..e3e66612ec 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java @@ -502,9 +502,9 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { Path dumpPath = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME); assertTrue(primary.miniDFSCluster.getFileSystem().exists(dumpPath)); - // _bootstrap//t2 - // _bootstrap//t3 - Path dbPath = new Path(dumpPath, primaryDbName); + // _bootstrap/metedata//t2 + // _bootstrap/metedata//t3 + Path dbPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME + File.separator + primaryDbName); Path tblPath = new Path(dbPath, "t2"); assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath)); tblPath = new Path(dbPath, "t3"); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java index 4a64927bad..d99bf54f62 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java @@ -17,11 +17,15 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.exec.repl.ReplAck; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionService; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; @@ -30,6 +34,10 @@ import org.junit.Test; import org.junit.BeforeClass; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.Map; import java.util.HashMap; @@ -38,6 +46,7 @@ * TestScheduledReplicationScenarios - test scheduled replication . */ public class TestScheduledReplicationScenarios extends BaseReplicationScenariosAcidTables { + private static final long DEFAULT_PROBE_TIMEOUT = 2 * 60 * 1000L; // 2 minutes @BeforeClass public static void classLevelSetup() throws Exception { @@ -102,15 +111,19 @@ public void testAcidTablesReplLoadBootstrapIncr() throws Throwable { .run("insert into t1 values(2)"); try (ScheduledQueryExecutionService schqS = ScheduledQueryExecutionService.startScheduledQueryExecutorService(primary.hiveConf)) { - int next = 0; - ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); - primary.run("create scheduled query s1_t1 every 10 minutes as repl dump " + primaryDbName); - primary.run("alter scheduled query s1_t1 execute"); - Thread.sleep(6000); - replica.run("create scheduled query s2_t1 every 10 minutes as repl load " + primaryDbName + " INTO " + int next = -1; + ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next), true); + primary.run("create scheduled query s1_t1 every 5 seconds as repl dump " + primaryDbName); + replica.run("create scheduled query s2_t1 every 5 seconds as repl load " + primaryDbName + " INTO " + replicatedDbName); - replica.run("alter scheduled query s2_t1 execute"); - Thread.sleep(20000); + Path dumpRoot = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR), + Base64.getEncoder().encodeToString(primaryDbName.toLowerCase().getBytes(StandardCharsets.UTF_8.name()))); + FileSystem fs = FileSystem.get(dumpRoot.toUri(), primary.hiveConf); + + next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1; + Path ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR + + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString()); + waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT); replica.run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -121,12 +134,10 @@ public void testAcidTablesReplLoadBootstrapIncr() throws Throwable { primary.run("use " + primaryDbName) .run("insert into t1 values(3)") .run("insert into t1 values(4)"); - next++; - ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); - primary.run("alter scheduled query s1_t1 execute"); - Thread.sleep(20000); - replica.run("alter scheduled query s2_t1 execute"); - Thread.sleep(20000); + next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1; + ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR + + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString()); + waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT); replica.run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -137,12 +148,10 @@ public void testAcidTablesReplLoadBootstrapIncr() throws Throwable { primary.run("use " + primaryDbName) .run("insert into t1 values(5)") .run("insert into t1 values(6)"); - next++; - ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); - primary.run("alter scheduled query s1_t1 execute"); - Thread.sleep(30000); - replica.run("alter scheduled query s2_t1 execute"); - Thread.sleep(30000); + next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1; + ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR + + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString()); + waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT); replica.run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -168,15 +177,18 @@ public void testExternalTablesReplLoadBootstrapIncr() throws Throwable { .run("insert into t2 values(2)"); try (ScheduledQueryExecutionService schqS = ScheduledQueryExecutionService.startScheduledQueryExecutorService(primary.hiveConf)) { - int next = 0; - ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); - primary.run("create scheduled query s1_t2 every 10 minutes as repl dump " + primaryDbName + withClause); - primary.run("alter scheduled query s1_t2 execute"); - Thread.sleep(80000); - replica.run("create scheduled query s2_t2 every 10 minutes as repl load " + primaryDbName + " INTO " + int next = -1; + ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next), true); + primary.run("create scheduled query s1_t2 every 5 seconds as repl dump " + primaryDbName + withClause); + replica.run("create scheduled query s2_t2 every 5 seconds as repl load " + primaryDbName + " INTO " + replicatedDbName); - replica.run("alter scheduled query s2_t2 execute"); - Thread.sleep(80000); + Path dumpRoot = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR), + Base64.getEncoder().encodeToString(primaryDbName.toLowerCase().getBytes(StandardCharsets.UTF_8.name()))); + FileSystem fs = FileSystem.get(dumpRoot.toUri(), primary.hiveConf); + next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1; + Path ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR + + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString()); + waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT); replica.run("use " + replicatedDbName) .run("show tables like 't2'") .verifyResult("t2") @@ -187,22 +199,37 @@ public void testExternalTablesReplLoadBootstrapIncr() throws Throwable { primary.run("use " + primaryDbName) .run("insert into t2 values(3)") .run("insert into t2 values(4)"); - next++; - ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); - primary.run("alter scheduled query s1_t2 execute"); - Thread.sleep(80000); - replica.run("alter scheduled query s2_t2 execute"); - Thread.sleep(80000); + next = Integer.parseInt(ReplDumpWork.getTestInjectDumpDir()) + 1; + ackPath = new Path(dumpRoot, String.valueOf(next) + File.separator + ReplUtils.REPL_HIVE_BASE_DIR + + File.separator + ReplAck.LOAD_ACKNOWLEDGEMENT.toString()); + waitForAck(fs, ackPath, DEFAULT_PROBE_TIMEOUT); replica.run("use " + replicatedDbName) .run("show tables like 't2'") .verifyResult("t2") .run("select id from t2 order by id") .verifyResults(new String[]{"1", "2", "3", "4"}); - - } finally { primary.run("drop scheduled query s1_t2"); replica.run("drop scheduled query s2_t2"); } } + private void waitForAck(FileSystem fs, Path ackFile, long timeout) throws IOException { + long oldTime = System.currentTimeMillis(); + long sleepInterval = 2; + + while(true) { + if (fs.exists(ackFile)) { + return; + } + try { + Thread.sleep(sleepInterval); + } catch (InterruptedException e) { + //no-op + } + if (System.currentTimeMillis() - oldTime > timeout) { + break; + } + } + throw new IOException("Timed out waiting for the ack file: " + ackFile.toString()); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index 93e24ef852..5bdf5515e5 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -20,7 +20,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; @@ -242,13 +241,8 @@ private void verifyBootstrapDirInIncrementalDump(String dumpLocation, String[] b Assert.assertTrue(primary.miniDFSCluster.getFileSystem().exists(dumpPath)); // Check if the DB dump path have any tables other than the ones listed in bootstrappedTables. - Path dbPath = new Path(dumpPath, primaryDbName); - FileStatus[] fileStatuses = primary.miniDFSCluster.getFileSystem().listStatus(dbPath, new PathFilter() { - @Override - public boolean accept(Path path) { - return !path.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME); - } - }); + Path dbPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME + File.separator + primaryDbName); + FileStatus[] fileStatuses = primary.miniDFSCluster.getFileSystem().listStatus(dbPath); Assert.assertEquals(fileStatuses.length, bootstrappedTables.length); // Eg: _bootstrap//t2, _bootstrap//t3 etc diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java index db8db5f8e7..d9c873b332 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java @@ -22,6 +22,7 @@ */ public enum ReplAck { DUMP_ACKNOWLEDGEMENT("_finished_dump"), + EVENTS_DUMP("_events_dump"), LOAD_ACKNOWLEDGEMENT("_finished_load"); private String ack; ReplAck(String ack) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 2e0af02094..b2b6497f4d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; +import org.apache.hadoop.hive.metastore.utils.Retry; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -78,9 +79,13 @@ import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStreamReader; import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Set; import java.util.HashSet; @@ -132,25 +137,23 @@ public int execute() { if (work.tableDataCopyIteratorsInitialized()) { initiateDataCopyTasks(); } else { - Hive hiveDb = getHive(); - Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), - Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() - .getBytes(StandardCharsets.UTF_8.name()))); + Path dumpRoot = getEncodedDumpRootPath(); Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot); + boolean isBootstrap = (previousValidHiveDumpPath == null); //If no previous dump is present or previous dump is already loaded, proceed with the dump operation. if (shouldDump(previousValidHiveDumpPath)) { - Path currentDumpPath = getCurrentDumpPath(dumpRoot); + Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap); Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); // Initialize ReplChangeManager instance since we will require it to encode file URI. ReplChangeManager.getInstance(conf); Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); Long lastReplId; - if (previousValidHiveDumpPath == null) { - lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + if (isBootstrap) { + lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, getHive()); } else { work.setEventFrom(getEventFromPreviousDumpMetadata(previousValidHiveDumpPath)); - lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, getHive()); } work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); work.setCurrentDumpPath(currentDumpPath); @@ -167,17 +170,24 @@ public int execute() { return 0; } - private Path getCurrentDumpPath(Path dumpRoot) throws IOException { - Path previousDumpPath = getPreviousDumpPath(dumpRoot); - if (previousDumpPath != null && !validDump(previousDumpPath) && shouldResumePreviousDump(previousDumpPath)) { + private Path getEncodedDumpRootPath() throws UnsupportedEncodingException { + return new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), + Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() + .getBytes(StandardCharsets.UTF_8.name()))); + } + + private Path getCurrentDumpPath(Path dumpRoot, boolean isBootstrap) throws IOException { + Path lastDumpPath = getLatestDumpPath(dumpRoot); + if (lastDumpPath != null && shouldResumePreviousDump(lastDumpPath, isBootstrap)) { //Resume previous dump - return previousDumpPath; + LOG.info("Resuming the dump with existing dump directory {}", lastDumpPath); + return lastDumpPath; } else { return new Path(dumpRoot, getNextDumpDir()); } } - private void initiateDataCopyTasks() throws SemanticException, IOException { + private void initiateDataCopyTasks() throws SemanticException { TaskTracker taskTracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS)); List> childTasks = new ArrayList<>(); childTasks.addAll(work.externalTableCopyTasks(taskTracker, conf)); @@ -191,7 +201,7 @@ private void initiateDataCopyTasks() throws SemanticException, IOException { } } - private void finishRemainingTasks() throws SemanticException, IOException { + private void finishRemainingTasks() throws SemanticException { prepareReturnValues(work.getResultValues()); Path dumpAckFile = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_HIVE_BASE_DIR + File.separator @@ -225,7 +235,7 @@ private void deleteAllPreviousDumpMeta(Path currentDumpPath) { } private Path getDumpRoot(Path currentDumpPath) { - if (ReplDumpWork.testDeletePreviousDumpMetaPath && conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { + if (ReplDumpWork.testDeletePreviousDumpMetaPath && conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) { //testDeleteDumpMetaDumpPath to be used only for test. return null; } else { @@ -425,11 +435,16 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive EventUtils.MSClientNotificationFetcher evFetcher = new EventUtils.MSClientNotificationFetcher(hiveDb); + + int maxEventLimit = getMaxEventAllowed(work.maxEventLimit()); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( - evFetcher, work.eventFrom, work.maxEventLimit(), evFilter); + evFetcher, work.eventFrom, maxEventLimit, evFilter); lastReplId = work.eventTo; + Path ackFile = new Path(dumpRoot, ReplAck.EVENTS_DUMP.toString()); + long resumeFrom = Utils.fileExists(ackFile, conf) ? getResumeFrom(ackFile) : work.eventFrom; + // Right now the only pattern allowed to be specified is *, which matches all the database // names. So passing dbname as is works since getDbNotificationEventsCount can exclude filter // on database name when it's *. In future, if we support more elaborate patterns, we will @@ -438,16 +453,24 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty()) ? work.dbNameOrPattern : "?"; - int maxEventLimit = work.maxEventLimit(); replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(), evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo, maxEventLimit), work.eventFrom, work.eventTo, maxEventLimit); replLogger.startLog(); + long dumpedCount = resumeFrom - work.eventFrom; + if (dumpedCount > 0) { + LOG.info("Event id {} to {} are already dumped, skipping {} events", work.eventFrom, resumeFrom, dumpedCount); + } + cleanFailedEventDirIfExists(dumpRoot, resumeFrom); while (evIter.hasNext()) { NotificationEvent ev = evIter.next(); lastReplId = ev.getEventId(); + if (ev.getEventId() <= resumeFrom) { + continue; + } Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); dumpEvent(ev, evRoot, dumpRoot, cmRoot, hiveDb); + Utils.writeOutput(String.valueOf(lastReplId), ackFile, conf); } replLogger.endLog(lastReplId.toString()); @@ -460,7 +483,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive if (work.oldReplScope != null) { dmd.setReplScope(work.replScope); } - dmd.write(); + dmd.write(true); // Examine all the tables if required. if (shouldExamineTablesToDump() || (tableList != null)) { @@ -470,8 +493,18 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); } + /* When same dump dir is resumed because of check-pointing, we need to clear the existing metadata. + We need to rewrite the metadata as the write id list will be changed. + We can't reuse the previous write id as it might be invalid due to compaction. */ + Path bootstrapRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); + Path metadataPath = new Path(bootstrapRoot, EximUtil.METADATA_PATH_NAME); + FileSystem fs = FileSystem.get(metadataPath.toUri(), conf); + if (fs.exists(metadataPath)) { + fs.delete(metadataPath, true); + } + Path dbRootMetadata = new Path(metadataPath, dbName); + Path dbRootData = new Path(bootstrapRoot, EximUtil.DATA_PATH_NAME + File.separator + dbName); managedTableCopyPaths = new ArrayList<>(); - Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true); List extTableLocations = new LinkedList<>(); try (Writer writer = new Writer(dumpRoot, conf)) { for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { @@ -487,10 +520,8 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive // Dump the table to be bootstrapped if required. if (shouldBootstrapDumpTable(table)) { HiveWrapper.Tuple tableTuple = new HiveWrapper(hiveDb, dbName).table(table); - Path dbDataRoot = new Path(dbRoot, EximUtil.DATA_PATH_NAME); managedTableCopyPaths.addAll( - dumpTable(dbName, tableName, validTxnList, - dbRoot, dbDataRoot, bootDumpBeginReplId, + dumpTable(dbName, tableName, validTxnList, dbRootMetadata, dbRootData, bootDumpBeginReplId, hiveDb, tableTuple)); } if (tableList != null && isTableSatifiesConfig(table)) { @@ -511,6 +542,59 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive return lastReplId; } + private int getMaxEventAllowed(int currentEventMaxLimit) { + int maxDirItems = Integer.parseInt(conf.get(ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG, "0")); + if (maxDirItems > 0) { + maxDirItems = maxDirItems - ReplUtils.RESERVED_DIR_ITEMS_COUNT; + if (maxDirItems < currentEventMaxLimit) { + LOG.warn("Changing the maxEventLimit from {} to {} as the '" + ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG + + "' limit encountered. Set this config appropriately to increase the maxEventLimit", + currentEventMaxLimit, maxDirItems); + currentEventMaxLimit = maxDirItems; + } + } + return currentEventMaxLimit; + } + + private void cleanFailedEventDirIfExists(Path dumpDir, long resumeFrom) throws IOException { + Path nextEventRoot = new Path(dumpDir, String.valueOf(resumeFrom + 1)); + Retry retriable = new Retry(IOException.class) { + @Override + public Void execute() throws IOException { + FileSystem fs = FileSystem.get(nextEventRoot.toUri(), conf); + if (fs.exists(nextEventRoot)) { + fs.delete(nextEventRoot, true); + } + return null; + } + }; + try { + retriable.run(); + } catch (Exception e) { + throw new IOException(e); + } + } + + private long getResumeFrom(Path ackFile) throws SemanticException { + BufferedReader br = null; + try { + FileSystem fs = ackFile.getFileSystem(conf); + br = new BufferedReader(new InputStreamReader(fs.open(ackFile), Charset.defaultCharset())); + long lastEventID = Long.parseLong(br.readLine()); + return lastEventID; + } catch (Exception ex) { + throw new SemanticException(ex); + } finally { + if (br != null) { + try { + br.close(); + } catch (IOException e) { + throw new SemanticException(e); + } + } + } + } + private boolean needBootstrapAcidTablesDuringIncrementalDump() { // If acid table dump is not enabled, then no neeed to check further. if (!ReplUtils.includeAcidTableInDump(conf)) { @@ -526,13 +610,6 @@ private boolean needBootstrapAcidTablesDuringIncrementalDump() { || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)); } - private Path getBootstrapDbRoot(Path dumpRoot, String dbName, boolean isIncrementalPhase) { - if (isIncrementalPhase) { - dumpRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); - } - return new Path(dumpRoot, dbName); - } - private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot, Path cmRoot, Hive db) throws Exception { EventHandler.Context context = new EventHandler.Context( evRoot, @@ -715,7 +792,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) LOG.info("Preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); - dmd.write(); + dmd.write(true); work.setDirCopyIterator(extTableCopyWorks.iterator()); work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); @@ -731,9 +808,23 @@ private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) { } } - private boolean shouldResumePreviousDump(Path dumpPath) { - Path hiveDumpPath = new Path(dumpPath, ReplUtils.REPL_HIVE_BASE_DIR); - return shouldResumePreviousDump(new DumpMetaData(hiveDumpPath, conf)); + private boolean shouldResumePreviousDump(Path lastDumpPath, boolean isBootStrap) throws IOException { + if (validDump(lastDumpPath)) { + return false; + } + Path hiveDumpPath = new Path(lastDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); + if (isBootStrap) { + return shouldResumePreviousDump(new DumpMetaData(hiveDumpPath, conf)); + } + // In case of incremental we should resume if _events_dump file is present and is valid + Path lastEventFile = new Path(hiveDumpPath, ReplAck.EVENTS_DUMP.toString()); + long resumeFrom = 0; + try { + resumeFrom = getResumeFrom(lastEventFile); + } catch (SemanticException ex) { + LOG.info("Could not get last repl id from {}, because of:", lastEventFile, ex.getMessage()); + } + return resumeFrom > 0L; } long currentNotificationId(Hive hiveDb) throws TException { @@ -742,7 +833,7 @@ long currentNotificationId(Hive hiveDb) throws TException { Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hiveDb) throws Exception { // TODO : instantiating FS objects are generally costly. Refactor - Path dbRoot = getBootstrapDbRoot(metadataRoot, dbName, false); + Path dbRoot = new Path(metadataRoot, dbName); FileSystem fs = dbRoot.getFileSystem(conf); Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); HiveWrapper.Tuple database = new HiveWrapper(hiveDb, dbName, lastReplId).database(); @@ -854,10 +945,11 @@ private String getNextDumpDir() { // make it easy to write .q unit tests, instead of unique id generation. // however, this does mean that in writing tests, we have to be aware that // repl dump will clash with prior dumps, and thus have to clean up properly. - if (ReplDumpWork.testInjectDumpDir == null) { + String nextDump = ReplDumpWork.getInjectNextDumpDirForTest(); + if (nextDump == null) { return "next"; } else { - return ReplDumpWork.testInjectDumpDir; + return nextDump; } } else { return UUID.randomUUID().toString(); @@ -867,7 +959,7 @@ private String getNextDumpDir() { } } - private Path getPreviousDumpPath(Path dumpRoot) throws IOException { + private Path getLatestDumpPath(Path dumpRoot) throws IOException { FileSystem fs = dumpRoot.getFileSystem(conf); if (fs.exists(dumpRoot)) { FileStatus[] statuses = fs.listStatus(dumpRoot); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 1f0d70212c..0e50ec61a4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -47,7 +47,8 @@ final String dbNameOrPattern, astRepresentationForErrorMsg, resultTempPath; Long eventTo; Long eventFrom; - static String testInjectDumpDir = null; + private static String testInjectDumpDir = null; + private static boolean testInjectDumpDirAutoIncrement = false; static boolean testDeletePreviousDumpMetaPath = false; private Integer maxEventLimit; private transient Iterator dirCopyIterator; @@ -56,7 +57,22 @@ private List resultValues; public static void injectNextDumpDirForTest(String dumpDir) { + injectNextDumpDirForTest(dumpDir, false); + } + public static void injectNextDumpDirForTest(String dumpDir, boolean autoIncr) { testInjectDumpDir = dumpDir; + testInjectDumpDirAutoIncrement = autoIncr; + } + + public static synchronized String getTestInjectDumpDir() { + return testInjectDumpDir; + } + + public static synchronized String getInjectNextDumpDirForTest() { + if (testInjectDumpDirAutoIncrement) { + testInjectDumpDir = String.valueOf(Integer.parseInt(testInjectDumpDir) + 1); + } + return testInjectDumpDir; } public static void testDeletePreviousDumpMetaPath(boolean failDeleteDumpMeta) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 56efa32cb6..f072effc31 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -84,8 +84,8 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, Path incBootstrapDir = new Path(dumpDirectory, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); FileSystem fs = incBootstrapDir.getFileSystem(hiveConf); if (fs.exists(incBootstrapDir)) { - this.bootstrapIterator = new BootstrapEventsIterator(incBootstrapDir.toString(), dbNameToLoadIn, - true, hiveConf); + this.bootstrapIterator = new BootstrapEventsIterator( + new Path(incBootstrapDir, EximUtil.METADATA_PATH_NAME).toString(), dbNameToLoadIn, true, hiveConf); this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); } else { this.bootstrapIterator = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java index a311f7ae22..4cf316a42c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java @@ -180,9 +180,6 @@ public BootstrapEvent next() { } private Path getDbLevelDataPath() { - if (dbLevelPath.toString().contains(Path.SEPARATOR + ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME + Path.SEPARATOR)) { - return new Path(dbLevelPath, EximUtil.DATA_PATH_NAME); - } return new Path(new Path(dbLevelPath.getParent().getParent(), EximUtil.DATA_PATH_NAME), dbLevelPath.getName()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 939cbc3a35..4fcee0e34a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -94,6 +94,12 @@ // Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be // seen in production or in case of tests other than the ones where it's required. public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables"; + + // HDFS Config to define the maximum number of items a directory may contain. + public static final String DFS_MAX_DIR_ITEMS_CONFIG = "dfs.namenode.fs-limits.max-directory-items"; + + // Reserved number of items to accommodate operational files in the dump root dir. + public static final int RESERVED_DIR_ITEMS_COUNT = 10; /** * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. */ @@ -236,7 +242,6 @@ public static PathFilter getEventsDirectoryFilter(final FileSystem fs) { try { return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) - && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME) && !p.getName().equalsIgnoreCase(EximUtil.METADATA_PATH_NAME); } catch (IOException e) { throw new RuntimeException(e); @@ -248,7 +253,6 @@ public static PathFilter getBootstrapDirectoryFilter(final FileSystem fs) { return p -> { try { return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) - && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME) && !p.getName().equalsIgnoreCase(EximUtil.METADATA_PATH_NAME); } catch (IOException e) { throw new RuntimeException(e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 73dc606d87..3de583276e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -114,7 +114,7 @@ String threadName = Thread.currentThread().getName(); LOG.debug("Thread: {}, start partition dump {}", threadName, partitionName); try { - // this the data copy + // Data Copy in case of ExportTask List dataPathList = Utils.getDataPathList(partition.getDataLocation(), forReplicationSpec, hiveConf); Path rootDataDumpDir = paths.partitionMetadataExportDir(partitionName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index bdf09ac92b..3b49801138 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -64,22 +64,60 @@ public static void writeOutput(List> listValues, Path outputFile, HiveConf hiveConf) throws SemanticException { - DataOutputStream outStream = null; + writeOutput(listValues, outputFile, hiveConf, false); + } + + public static void writeOutput(List> listValues, Path outputFile, HiveConf hiveConf, boolean update) + throws SemanticException { + Retry retriable = new Retry(IOException.class) { + @Override + public Void execute() throws IOException { + DataOutputStream outStream = null; + try { + FileSystem fs = outputFile.getFileSystem(hiveConf); + outStream = fs.create(outputFile, update); + for (List values : listValues) { + outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0))); + for (int i = 1; i < values.size(); i++) { + outStream.write(Utilities.tabCode); + outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i))); + } + outStream.write(Utilities.newLineCode); + } + } finally { + IOUtils.closeStream(outStream); + } + return null; + } + }; try { - FileSystem fs = outputFile.getFileSystem(hiveConf); - outStream = fs.create(outputFile); - for (List values : listValues) { - outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0))); - for (int i = 1; i < values.size(); i++) { - outStream.write(Utilities.tabCode); - outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i))); + retriable.run(); + } catch (Exception e) { + throw new SemanticException(e); + } + } + + public static void writeOutput(String content, Path outputFile, HiveConf hiveConf) + throws SemanticException { + Retry retriable = new Retry(IOException.class) { + @Override + public Void execute() throws IOException { + DataOutputStream outStream = null; + try { + FileSystem fs = outputFile.getFileSystem(hiveConf); + outStream = fs.create(outputFile); + outStream.writeBytes(content); + outStream.write(Utilities.newLineCode); + } finally { + IOUtils.closeStream(outStream); } - outStream.write(Utilities.newLineCode); + return null; } - } catch (IOException e) { + }; + try { + retriable.run(); + } catch (Exception e) { throw new SemanticException(e); - } finally { - IOUtils.closeStream(outStream); } } @@ -100,6 +138,14 @@ public Void execute() throws IOException { } } + public static boolean fileExists(Path filePath, HiveConf hiveConf) throws IOException { + FileSystem fs = filePath.getFileSystem(hiveConf); + if (fs.exists(filePath)) { + return true; + } + return false; + } + public static Iterable matchesDb(Hive db, String dbPattern) throws HiveException { if (dbPattern == null) { return db.getAllDatabases(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 85065326b3..6462e17515 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -31,9 +30,6 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; import java.util.Iterator; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -117,14 +113,6 @@ public void handle(Context withinContext) throws Exception { withinContext.createDmd(this).write(); } - private BufferedWriter writer(Context withinContext, Partition qlPtn) - throws IOException { - Path ptnDataPath = new Path(withinContext.eventRoot, qlPtn.getName()); - FileSystem fs = ptnDataPath.getFileSystem(withinContext.hiveConf); - Path filesPath = new Path(ptnDataPath, EximUtil.FILES_NAME); - return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); - } - @Override public DumpType dumpType() { return DumpType.EVENT_ADD_PARTITION; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 701dd6b57e..09662dacf9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; @@ -30,9 +29,6 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; import java.util.Collections; import java.util.List; @@ -108,12 +104,6 @@ public void handle(Context withinContext) throws Exception { return new org.apache.hadoop.hive.ql.metadata.Partition(qlMdTable, insertMsg.getPtnObj()); } - private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException { - Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); - FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); - return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); - } - @Override public DumpType dumpType() { return DumpType.EVENT_INSERT; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java index f275194b6d..e538c79f34 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java @@ -196,6 +196,10 @@ private void initializeIfNot() throws SemanticException { } public void write() throws SemanticException { + write(false); + } + + public void write(boolean replace) throws SemanticException { List> listValues = new ArrayList<>(); listValues.add( Arrays.asList( @@ -208,7 +212,6 @@ public void write() throws SemanticException { if (replScope != null) { listValues.add(prepareReplScopeValues()); } - Utils.writeOutput(listValues, dumpFile, hiveConf - ); + Utils.writeOutput(listValues, dumpFile, hiveConf, replace); } }