diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 39722a0dd2..3a80f35e4d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -33,6 +33,9 @@ import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.exec.repl.ReplAck; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.session.SessionState; @@ -47,6 +50,9 @@ import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -55,6 +61,7 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.parse.EximUtil.LAST_EVENT_ID_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -663,6 +670,261 @@ public void testMultiDBTxn() throws Throwable { } } + @Test + public void testIncrementalDumpCheckpointing() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("CREATE TABLE t2(a string) STORED AS TEXTFILE") + .dump(primaryDbName); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {}); + + + //Case 1: When the last dump finished all the events and + //only _finished_dump file at the hiveDumpRoot was about to be written when it failed. + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("insert into t1 values (1)") + .run("insert into t2 values (2)") + .dump(primaryDbName); + + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(ackLastEventID)); + + fs.delete(ackFile, false); + + Map eventModTimeMap = new HashMap<>(); + long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1; + long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId); + assertTrue(lastIncEventID > (firstIncEventID + 1)); + + for (long eventId=firstIncEventID; eventId<=lastIncEventID; eventId++) { + Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId)); + if (fs.exists(eventRoot)) { + eventModTimeMap.put(String.valueOf(eventId), fs.getFileStatus(eventRoot).getModificationTime()); + } + } + + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName); + assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation); + assertTrue(fs.exists(ackFile)); + //check events were not rewritten. + for(Map.Entry entry :eventModTimeMap.entrySet()) { + long oldModTime = entry.getValue(); + long newModTime = fs.getFileStatus(new Path(hiveDumpDir, entry.getKey())).getModificationTime(); + assertEquals(oldModTime, newModTime); + } + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"2"}); + + + //Case 2: When the last dump was half way through + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump3 = primary.run("use " + primaryDbName) + .run("insert into t1 values (3)") + .run("insert into t2 values (4)") + .dump(primaryDbName); + + hiveDumpDir = new Path(incrementalDump3.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(ackLastEventID)); + + fs.delete(ackFile, false); + //delete last three events and test if it recovers. + long lastEventID = Long.parseLong(incrementalDump3.lastReplicationId); + Path lastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID)); + Path secondLastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID - 1)); + Path thirdLastEvtRoot = new Path(hiveDumpDir + File.separator + String.valueOf(lastEventID - 2)); + assertTrue(fs.exists(lastEvtRoot)); + assertTrue(fs.exists(secondLastEvtRoot)); + assertTrue(fs.exists(thirdLastEvtRoot)); + + long lastEvtModTimeOld = fs.getFileStatus(lastEvtRoot).getModificationTime(); + long secondLastEvtModTimeOld = fs.getFileStatus(secondLastEvtRoot).getModificationTime(); + long thirdLastEvtModTimeOld = fs.getFileStatus(thirdLastEvtRoot).getModificationTime(); + + fs.delete(lastEvtRoot, true); + fs.delete(secondLastEvtRoot, true); + fs.delete(thirdLastEvtRoot, true); + List> listValues = new ArrayList<>(); + listValues.add( + Arrays.asList( + LAST_EVENT_ID_NAME, + String.valueOf(lastEventID - 3) + ) + ); + org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(listValues, ackLastEventID, primary.hiveConf, true); + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + + WarehouseInstance.Tuple incrementalDump4 = primary.run("use " + primaryDbName) + .dump(primaryDbName); + + assertEquals(incrementalDump3.dumpLocation, incrementalDump4.dumpLocation); + + verifyPathExist(fs, ackFile); + verifyPathExist(fs, ackLastEventID); + verifyPathExist(fs, lastEvtRoot); + verifyPathExist(fs, secondLastEvtRoot); + verifyPathExist(fs, thirdLastEvtRoot); + assertTrue(fs.getFileStatus(lastEvtRoot).getModificationTime() > lastEvtModTimeOld); + assertTrue(fs.getFileStatus(secondLastEvtRoot).getModificationTime() > secondLastEvtModTimeOld); + assertTrue(fs.getFileStatus(thirdLastEvtRoot).getModificationTime() > thirdLastEvtModTimeOld); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "3"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"2", "4"}); + } + + @Test + public void testCheckpointingOnFirstEventDump() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .dump(primaryDbName); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {}); + + // Testing a scenario where first event was getting dumped and it failed. + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("insert into t1 values (1)") + .dump(primaryDbName); + + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + Path dumpMetaData = new Path(hiveDumpDir, "_dumpmetadata"); + + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + assertTrue(fs.exists(ackFile)); + assertTrue(fs.exists(ackLastEventID)); + assertTrue(fs.exists(dumpMetaData)); + + fs.delete(ackFile, false); + fs.delete(ackLastEventID, false); + fs.delete(dumpMetaData, false); + //delete all the event folder except first one. + long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1; + long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId); + assertTrue(lastIncEventID > (firstIncEventID + 1)); + + for (long eventId=firstIncEventID + 1; eventId<=lastIncEventID; eventId++) { + Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId)); + if (fs.exists(eventRoot)) { + fs.delete(eventRoot, true); + } + } + + Path firstIncEventRoot = new Path(hiveDumpDir, String.valueOf(firstIncEventID)); + long firstIncEventModTimeOld = fs.getFileStatus(firstIncEventRoot).getModificationTime(); + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName); + + assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation); + ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + assertTrue(fs.exists(ackFile)); + long firstIncEventModTimeNew = fs.getFileStatus(firstIncEventRoot).getModificationTime(); + assertTrue(firstIncEventModTimeNew > firstIncEventModTimeOld); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1"}); + } + + + @Test + public void testCheckPointingDataDumpFailureBootstrapDuringIncremental() throws Throwable { + List dumpClause = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'", + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'"); + + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("create table t1(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .dump(primaryDbName, dumpClause); + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "2"}); + + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) + .run("create table t2(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t2 values (3)") + .run("insert into t2 values (4)") + .run("insert into t2 values (5)") + .dump(primaryDbName, dumpClause); + + Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); + Path ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString()); + Path bootstrapDir = new Path(hiveDumpDir, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); + Path metaDir = new Path(bootstrapDir, EximUtil.METADATA_PATH_NAME); + + Path t2dataDir = new Path(bootstrapDir, EximUtil.DATA_PATH_NAME + File.separator + + primaryDbName + File.separator + "t2"); + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf); + + verifyPathExist(fs, ackFile); + verifyPathExist(fs, ackLastEventID); + + long oldMetadirModTime = fs.getFileStatus(metaDir).getModificationTime(); + long oldT2DatadirModTime = fs.getFileStatus(t2dataDir).getModificationTime(); + + fs.delete(ackFile, false); + + //Do another dump and test the rewrite happended for meta and no write for data folder + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + primaryDbName) + .dump(primaryDbName, dumpClause); + assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation); + assertTrue(fs.exists(ackFile)); + verifyPathExist(fs, ackFile); + verifyPathExist(fs, ackLastEventID); + + long newMetadirModTime = fs.getFileStatus(metaDir).getModificationTime(); + long newT2DatadirModTime = fs.getFileStatus(t2dataDir).getModificationTime(); + + assertTrue(newMetadirModTime > oldMetadirModTime); + assertEquals(oldT2DatadirModTime, newT2DatadirModTime); + + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[] {"3", "4", "5"}); + } + @Test public void testCheckPointingDataDumpFailure() throws Throwable { //To force distcp copy @@ -692,6 +954,7 @@ public void testCheckPointingDataDumpFailure() throws Throwable { Path dbDataPath = new Path(dataPath, primaryDbName.toLowerCase()); Path tablet1Path = new Path(dbDataPath, "t1"); Path tablet2Path = new Path(dbDataPath, "t2"); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); //Delete dump ack and t2 data, metadata should be rewritten, data should be same for t1 but rewritten for t2 fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); @@ -965,4 +1228,8 @@ public void testCheckPointingMetadataDumpFailure() throws Throwable { dumpPath = new Path(nextDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); } + + private void verifyPathExist(FileSystem fs, Path filePath) throws IOException { + assertTrue("Path not found:" + filePath, fs.exists(filePath)); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java index db8db5f8e7..d9c873b332 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java @@ -22,6 +22,7 @@ */ public enum ReplAck { DUMP_ACKNOWLEDGEMENT("_finished_dump"), + EVENTS_DUMP("_events_dump"), LOAD_ACKNOWLEDGEMENT("_finished_load"); private String ack; ReplAck(String ack) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 2e0af02094..76817cfba0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; +import org.apache.hadoop.hive.metastore.utils.Retry; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -78,8 +79,10 @@ import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStreamReader; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.Set; @@ -94,6 +97,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.parse.EximUtil.LAST_EVENT_ID_NAME; public class ReplDumpTask extends Task implements Serializable { private static final long serialVersionUID = 1L; @@ -139,7 +143,7 @@ public int execute() { Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot); //If no previous dump is present or previous dump is already loaded, proceed with the dump operation. if (shouldDump(previousValidHiveDumpPath)) { - Path currentDumpPath = getCurrentDumpPath(dumpRoot); + Path currentDumpPath = getCurrentDumpPath(dumpRoot, previousValidHiveDumpPath); Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); // Initialize ReplChangeManager instance since we will require it to encode file URI. @@ -167,11 +171,12 @@ public int execute() { return 0; } - private Path getCurrentDumpPath(Path dumpRoot) throws IOException { - Path previousDumpPath = getPreviousDumpPath(dumpRoot); - if (previousDumpPath != null && !validDump(previousDumpPath) && shouldResumePreviousDump(previousDumpPath)) { + private Path getCurrentDumpPath(Path dumpRoot, Path previousValidHiveDumpPath) throws IOException { + Path lastDumpPath = getLatestDumpPath(dumpRoot); + if (lastDumpPath != null && shouldResumePreviousDump(lastDumpPath, previousValidHiveDumpPath)) { //Resume previous dump - return previousDumpPath; + LOG.info("Resuming the dump with existing dump directory {}", lastDumpPath); + return lastDumpPath; } else { return new Path(dumpRoot, getNextDumpDir()); } @@ -225,7 +230,7 @@ private void deleteAllPreviousDumpMeta(Path currentDumpPath) { } private Path getDumpRoot(Path currentDumpPath) { - if (ReplDumpWork.testDeletePreviousDumpMetaPath && conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { + if (ReplDumpWork.testDeletePreviousDumpMetaPath && conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) { //testDeleteDumpMetaDumpPath to be used only for test. return null; } else { @@ -425,11 +430,20 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive EventUtils.MSClientNotificationFetcher evFetcher = new EventUtils.MSClientNotificationFetcher(hiveDb); + + int maxEventLimit = work.maxEventLimit(); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( - evFetcher, work.eventFrom, work.maxEventLimit(), evFilter); + evFetcher, work.eventFrom, maxEventLimit, evFilter); lastReplId = work.eventTo; + Path ackFile = new Path(dumpRoot, ReplAck.EVENTS_DUMP.toString()); + Long resumeFrom = work.eventFrom; + boolean ackExisted = Utils.create(ackFile, conf, true); + if (ackExisted) { + resumeFrom = getResumeFrom(ackFile, work.eventFrom); + } + // Right now the only pattern allowed to be specified is *, which matches all the database // names. So passing dbname as is works since getDbNotificationEventsCount can exclude filter // on database name when it's *. In future, if we support more elaborate patterns, we will @@ -438,16 +452,24 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty()) ? work.dbNameOrPattern : "?"; - int maxEventLimit = work.maxEventLimit(); replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(), evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo, maxEventLimit), work.eventFrom, work.eventTo, maxEventLimit); replLogger.startLog(); + Long dumpedCount = resumeFrom - work.eventFrom; + if (dumpedCount > 0) { + LOG.info("Event id {} to {} are already dumped, skipping {} events", work.eventFrom, resumeFrom, dumpedCount); + } + cleanFailedEventDirIfExists(dumpRoot, resumeFrom); while (evIter.hasNext()) { NotificationEvent ev = evIter.next(); lastReplId = ev.getEventId(); Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); + if (ev.getEventId() <= resumeFrom) { + continue; + } dumpEvent(ev, evRoot, dumpRoot, cmRoot, hiveDb); + updateLastEventDumped(ackFile, lastReplId); } replLogger.endLog(lastReplId.toString()); @@ -460,7 +482,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive if (work.oldReplScope != null) { dmd.setReplScope(work.replScope); } - dmd.write(); + dmd.write(true); // Examine all the tables if required. if (shouldExamineTablesToDump() || (tableList != null)) { @@ -470,8 +492,18 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); } + /* When same dump dir is resumed because of check-pointing, we need to clear the existing metadata. + We need to rewrite the metadata as the write id list will be changed. + We can't reuse the previous write id as it might be invalid due to compaction. */ + Path bootstrapRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); + Path metadataPath = new Path(bootstrapRoot, EximUtil.METADATA_PATH_NAME); + FileSystem fs = FileSystem.get(metadataPath.toUri(), conf); + if (fs.exists(metadataPath)) { + fs.delete(metadataPath, true); + } + Path dbRootMetadata = new Path(metadataPath, dbName); + Path dbRootData = new Path(bootstrapRoot, EximUtil.DATA_PATH_NAME + File.separator + dbName); managedTableCopyPaths = new ArrayList<>(); - Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true); List extTableLocations = new LinkedList<>(); try (Writer writer = new Writer(dumpRoot, conf)) { for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { @@ -487,10 +519,8 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive // Dump the table to be bootstrapped if required. if (shouldBootstrapDumpTable(table)) { HiveWrapper.Tuple tableTuple = new HiveWrapper(hiveDb, dbName).table(table); - Path dbDataRoot = new Path(dbRoot, EximUtil.DATA_PATH_NAME); managedTableCopyPaths.addAll( - dumpTable(dbName, tableName, validTxnList, - dbRoot, dbDataRoot, bootDumpBeginReplId, + dumpTable(dbName, tableName, validTxnList, dbRootMetadata, dbRootData, bootDumpBeginReplId, hiveDb, tableTuple)); } if (tableList != null && isTableSatifiesConfig(table)) { @@ -511,6 +541,63 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive return lastReplId; } + private void cleanFailedEventDirIfExists(Path dumpDir, Long resumeFrom) throws IOException { + Path nextEventRoot = new Path(dumpDir, String.valueOf(resumeFrom + 1)); + Retry retriable = new Retry(IOException.class) { + @Override + public Void execute() throws IOException { + FileSystem fs = FileSystem.get(nextEventRoot.toUri(), conf); + if (fs.exists(nextEventRoot)) { + fs.delete(nextEventRoot, true); + } + return null; + } + }; + try { + retriable.run(); + } catch (Exception e) { + throw new IOException(e); + } + } + + public void updateLastEventDumped(Path ackFile, Long lastReplId) throws SemanticException { + List> listValues = new ArrayList<>(); + listValues.add( + Arrays.asList( + LAST_EVENT_ID_NAME, + String.valueOf(lastReplId) + ) + ); + Utils.writeOutput(listValues, ackFile, conf, true); + } + + private long getResumeFrom(Path ackFile, long defID) throws SemanticException { + long lastEventID = defID; + BufferedReader br = null; + try { + FileSystem fs = ackFile.getFileSystem(conf); + br = new BufferedReader(new InputStreamReader(fs.open(ackFile))); + String line; + if ((line = br.readLine()) != null) { + String[] lineContents = line.split("\t", 5); + lastEventID = Long.parseLong(lineContents[1]); + } else { + LOG.warn("Unable to read lastEventID from ackFile:{}, defaulting to:{}", ackFile.toUri().toString(), defID); + } + } catch (IOException ioe) { + LOG.warn("Exception while obtaining last event id:", ioe.getMessage()); + } finally { + if (br != null) { + try { + br.close(); + } catch (IOException e) { + throw new SemanticException(e); + } + } + } + return lastEventID; + } + private boolean needBootstrapAcidTablesDuringIncrementalDump() { // If acid table dump is not enabled, then no neeed to check further. if (!ReplUtils.includeAcidTableInDump(conf)) { @@ -526,13 +613,6 @@ private boolean needBootstrapAcidTablesDuringIncrementalDump() { || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)); } - private Path getBootstrapDbRoot(Path dumpRoot, String dbName, boolean isIncrementalPhase) { - if (isIncrementalPhase) { - dumpRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); - } - return new Path(dumpRoot, dbName); - } - private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot, Path cmRoot, Hive db) throws Exception { EventHandler.Context context = new EventHandler.Context( evRoot, @@ -715,7 +795,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) LOG.info("Preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); - dmd.write(); + dmd.write(true); work.setDirCopyIterator(extTableCopyWorks.iterator()); work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); @@ -731,9 +811,17 @@ private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) { } } - private boolean shouldResumePreviousDump(Path dumpPath) { - Path hiveDumpPath = new Path(dumpPath, ReplUtils.REPL_HIVE_BASE_DIR); - return shouldResumePreviousDump(new DumpMetaData(hiveDumpPath, conf)); + private boolean shouldResumePreviousDump(Path lastDumpPath, Path previousValidHiveDumpPath) throws IOException { + if (validDump(lastDumpPath)) { + return false; + } + // In case of incremental _dumpmetadata may not be present, still we should resume. + if (previousValidHiveDumpPath != null) { + return true; + } else { + Path hiveDumpPath = new Path(lastDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); + return shouldResumePreviousDump(new DumpMetaData(hiveDumpPath, conf)); + } } long currentNotificationId(Hive hiveDb) throws TException { @@ -742,7 +830,7 @@ long currentNotificationId(Hive hiveDb) throws TException { Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hiveDb) throws Exception { // TODO : instantiating FS objects are generally costly. Refactor - Path dbRoot = getBootstrapDbRoot(metadataRoot, dbName, false); + Path dbRoot = new Path(metadataRoot, dbName); FileSystem fs = dbRoot.getFileSystem(conf); Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); HiveWrapper.Tuple database = new HiveWrapper(hiveDb, dbName, lastReplId).database(); @@ -867,7 +955,7 @@ private String getNextDumpDir() { } } - private Path getPreviousDumpPath(Path dumpRoot) throws IOException { + private Path getLatestDumpPath(Path dumpRoot) throws IOException { FileSystem fs = dumpRoot.getFileSystem(conf); if (fs.exists(dumpRoot)) { FileStatus[] statuses = fs.listStatus(dumpRoot); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 56efa32cb6..6a43cdc0f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.exec.Task; +import java.io.File; import java.io.IOException; import java.io.Serializable; @@ -84,8 +85,8 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, Path incBootstrapDir = new Path(dumpDirectory, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); FileSystem fs = incBootstrapDir.getFileSystem(hiveConf); if (fs.exists(incBootstrapDir)) { - this.bootstrapIterator = new BootstrapEventsIterator(incBootstrapDir.toString(), dbNameToLoadIn, - true, hiveConf); + this.bootstrapIterator = new BootstrapEventsIterator( + new Path(incBootstrapDir, EximUtil.METADATA_PATH_NAME).toString(), dbNameToLoadIn, true, hiveConf); this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); } else { this.bootstrapIterator = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java index a311f7ae22..4cf316a42c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java @@ -180,9 +180,6 @@ public BootstrapEvent next() { } private Path getDbLevelDataPath() { - if (dbLevelPath.toString().contains(Path.SEPARATOR + ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME + Path.SEPARATOR)) { - return new Path(dbLevelPath, EximUtil.DATA_PATH_NAME); - } return new Path(new Path(dbLevelPath.getParent().getParent(), EximUtil.DATA_PATH_NAME), dbLevelPath.getName()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 939cbc3a35..b55de96db7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -236,7 +236,6 @@ public static PathFilter getEventsDirectoryFilter(final FileSystem fs) { try { return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) - && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME) && !p.getName().equalsIgnoreCase(EximUtil.METADATA_PATH_NAME); } catch (IOException e) { throw new RuntimeException(e); @@ -248,7 +247,6 @@ public static PathFilter getBootstrapDirectoryFilter(final FileSystem fs) { return p -> { try { return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) - && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME) && !p.getName().equalsIgnoreCase(EximUtil.METADATA_PATH_NAME); } catch (IOException e) { throw new RuntimeException(e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 5ada55f31e..80e9ad08f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -75,6 +75,7 @@ public static final String FILES_NAME = "_files"; public static final String DATA_PATH_NAME = "data"; public static final String METADATA_PATH_NAME = "metadata"; + public static final String LAST_EVENT_ID_NAME = "LastEventID"; private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 73dc606d87..3de583276e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -114,7 +114,7 @@ String threadName = Thread.currentThread().getName(); LOG.debug("Thread: {}, start partition dump {}", threadName, partitionName); try { - // this the data copy + // Data Copy in case of ExportTask List dataPathList = Utils.getDataPathList(partition.getDataLocation(), forReplicationSpec, hiveConf); Path rootDataDumpDir = paths.partitionMetadataExportDir(partitionName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index bdf09ac92b..fe749c9fdf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -64,10 +64,15 @@ public static void writeOutput(List> listValues, Path outputFile, HiveConf hiveConf) throws SemanticException { + writeOutput(listValues, outputFile, hiveConf, false); + } + + public static void writeOutput(List> listValues, Path outputFile, HiveConf hiveConf, boolean update) + throws SemanticException { DataOutputStream outStream = null; try { FileSystem fs = outputFile.getFileSystem(hiveConf); - outStream = fs.create(outputFile); + outStream = fs.create(outputFile, update); for (List values : listValues) { outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0))); for (int i = 1; i < values.size(); i++) { @@ -100,6 +105,20 @@ public Void execute() throws IOException { } } + public static boolean create(Path outputFile, HiveConf hiveConf, boolean replace) + throws SemanticException { + try { + FileSystem fs = outputFile.getFileSystem(hiveConf); + if (fs.exists(outputFile)) { + return true; + } + create(outputFile, hiveConf); + } catch (IOException e) { + throw new SemanticException(e); + } + return false; + } + public static Iterable matchesDb(Hive db, String dbPattern) throws HiveException { if (dbPattern == null) { return db.getAllDatabases(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java index f275194b6d..e538c79f34 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java @@ -196,6 +196,10 @@ private void initializeIfNot() throws SemanticException { } public void write() throws SemanticException { + write(false); + } + + public void write(boolean replace) throws SemanticException { List> listValues = new ArrayList<>(); listValues.add( Arrays.asList( @@ -208,7 +212,6 @@ public void write() throws SemanticException { if (replScope != null) { listValues.add(prepareReplScopeValues()); } - Utils.writeOutput(listValues, dumpFile, hiveConf - ); + Utils.writeOutput(listValues, dumpFile, hiveConf, replace); } }