diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index c46103a254..5731f6a4a9 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -312,7 +312,22 @@ public void testBasic() throws IOException { verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driver); String replicatedDbName = dbName + "_dupe"; - bootstrapLoadAndVerify(dbName, replicatedDbName); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replicatedDbName); + + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + boolean dumpAckFound = false; + boolean loadAckFound = false; + for (FileStatus status : fs.listStatus(dumpPath)) { + if (status.getPath().getName().equalsIgnoreCase(ReplUtils.DUMP_ACKNOWLEDGEMENT)) { + dumpAckFound = true; + } + if (status.getPath().getName().equalsIgnoreCase(ReplUtils.LOAD_ACKNOWLEDGEMENT)) { + loadAckFound = true; + } + } + assertTrue(dumpAckFound); + assertTrue(loadAckFound); verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data, driverMirror); verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); @@ -838,7 +853,26 @@ public void testIncrementalAdds() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2, driver); // Perform REPL-DUMP/LOAD - incrementalLoadAndVerify(dbName, replDbName); + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); + boolean dumpAckFound = false; + boolean loadAckFound = false; + assertFalse(fs.exists(new Path(bootstrapDump.dumpLocation))); + fs = new Path(incrementalDump.dumpLocation).getFileSystem(hconf); + Path dumpPath = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + dumpAckFound = false; + loadAckFound = false; + for (FileStatus status : fs.listStatus(dumpPath)) { + if (status.getPath().getName().equalsIgnoreCase(ReplUtils.DUMP_ACKNOWLEDGEMENT)) { + dumpAckFound = true; + } + if (status.getPath().getName().equalsIgnoreCase(ReplUtils.LOAD_ACKNOWLEDGEMENT)) { + loadAckFound = true; + } + } + + assertTrue(dumpAckFound); + assertTrue(loadAckFound); // VERIFY tables and partitions on destination for equivalence. verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 92e45b4c57..ccbded6a91 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -126,21 +126,28 @@ public int execute() { Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() .getBytes(StandardCharsets.UTF_8.name()))); - Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); - Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); - DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); - // Initialize ReplChangeManager instance since we will require it to encode file URI. - ReplChangeManager.getInstance(conf); - Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); - Long lastReplId; - if (!dumpRoot.getFileSystem(conf).exists(dumpRoot) - || dumpRoot.getFileSystem(conf).listStatus(dumpRoot).length == 0) { - lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); - } else { - work.setEventFrom(getEventFromPreviousDumpMetadata(dumpRoot)); - lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + Path previousDumpMetaPath = getPreviousDumpMetadataPath(dumpRoot); + Path previousHiveDumpPath = + previousDumpMetaPath != null ? new Path(previousDumpMetaPath, ReplUtils.REPL_HIVE_BASE_DIR) : null; + //If no previous dump is present or previous dump was loaded, proceed + if (previousHiveDumpPath == null || dumpMetadataLoaded(previousHiveDumpPath)) { + Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); + Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); + DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); + // Initialize ReplChangeManager instance since we will require it to encode file URI. + ReplChangeManager.getInstance(conf); + Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); + Long lastReplId; + if (previousHiveDumpPath == null) { + lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + } else { + work.setEventFrom(getEventFromPreviousDumpMetadata(previousHiveDumpPath)); + lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + } + prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); + writeDumpCompleteAck(hiveDumpRoot); + deletePreviousDumpMeta(previousDumpMetaPath); } - prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); } catch (Exception e) { LOG.error("failed", e); setException(e); @@ -149,16 +156,21 @@ public int execute() { return 0; } - private Long getEventFromPreviousDumpMetadata(Path dumpRoot) throws IOException, SemanticException { - FileStatus[] statuses = dumpRoot.getFileSystem(conf).listStatus(dumpRoot); - if (statuses.length > 0) { - FileStatus latestUpdatedStatus = statuses[0]; - for (FileStatus status : statuses) { - if (status.getModificationTime() > latestUpdatedStatus.getModificationTime()) { - latestUpdatedStatus = status; - } - } - DumpMetaData dmd = new DumpMetaData(new Path(latestUpdatedStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR), conf); + private void deletePreviousDumpMeta(Path previousDumpMetaPath) throws IOException { + if (previousDumpMetaPath != null) { + FileSystem fs = previousDumpMetaPath.getFileSystem(conf); + fs.delete(previousDumpMetaPath, true); + } + } + + private void writeDumpCompleteAck(Path currentDumpPath) throws SemanticException { + Path ackPath = new Path(currentDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT); + Utils.write(ackPath, conf); + } + + private Long getEventFromPreviousDumpMetadata(Path previousDumpPath) throws SemanticException { + if (previousDumpPath != null) { + DumpMetaData dmd = new DumpMetaData(previousDumpPath, conf); if (dmd.isIncrementalDump()) { return dmd.getEventTo(); } @@ -168,6 +180,32 @@ private Long getEventFromPreviousDumpMetadata(Path dumpRoot) throws IOException, return 0L; } + private Path getPreviousDumpMetadataPath(Path dumpRoot) throws IOException { + FileSystem fs = dumpRoot.getFileSystem(conf); + if (fs.exists(dumpRoot)) { + FileStatus[] statuses = fs.listStatus(dumpRoot); + if (statuses.length == 1) { + return statuses[0].getPath(); + } + } + return null; + } + + private boolean dumpMetadataLoaded(Path previousDumpPath) throws IOException { + if (previousDumpPath != null) { + FileSystem fs = previousDumpPath.getFileSystem(conf); + if (fs.exists(previousDumpPath)) { + FileStatus[] latestUpdateStatuses = fs.listStatus(previousDumpPath); + for (FileStatus status : latestUpdateStatuses) { + if (status.getPath().getName().equalsIgnoreCase(ReplUtils.LOAD_ACKNOWLEDGEMENT)) { + return true; + } + } + } + } + return false; + } + private void prepareReturnValues(List values) throws SemanticException { LOG.debug("prepareReturnValues : " + dumpSchema); for (String s : values) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index a2c467bafd..39bc83fe03 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.plan.api.StageType; import java.io.IOException; @@ -97,19 +98,33 @@ public StageType getType() { @Override public int execute() { - Task rootTask = work.getRootTask(); - if (rootTask != null) { - rootTask.setChildTasks(null); - } - work.setRootTask(this); - this.parentTasks = null; - if (work.isIncrementalLoad()) { - return executeIncrementalLoad(); - } else { - return executeBootStrapLoad(); + try { + Task rootTask = work.getRootTask(); + if (rootTask != null) { + rootTask.setChildTasks(null); + } + work.setRootTask(this); + this.parentTasks = null; + int status = 0; + if (work.isIncrementalLoad()) { + status = executeIncrementalLoad(); + } else { + status = executeBootStrapLoad(); + } + writeLoadCompleteAck(work.dumpDirectory); + return status; + } catch (SemanticException e) { + LOG.error("failed", e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } } + private void writeLoadCompleteAck(String dumpDirectory) throws SemanticException { + Path ackPath = new Path(dumpDirectory, ReplUtils.LOAD_ACKNOWLEDGEMENT); + Utils.write(ackPath, conf); + } + private int executeBootStrapLoad() { try { int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 64ecf42eb4..a7c5ea0969 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -98,7 +98,10 @@ // Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be // seen in production or in case of tests other than the ones where it's required. public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables"; - + //Acknoledgement for repl dump complete + public static final String DUMP_ACKNOWLEDGEMENT = "_finished_dump"; + //Acknoledgement for repl load complete + public static final String LOAD_ACKNOWLEDGEMENT = "_finished_load"; /** * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index c2e9f883ce..52b5d7875c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -83,7 +83,6 @@ // By default, this will be same as that of super class BaseSemanticAnalyzer. But need to obtain again // if the Hive configs are received from WITH clause in REPL LOAD or REPL STATUS commands. private Hive db; - private boolean isTargetAlreadyLoaded; private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; @@ -390,7 +389,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // tells us what is inside that dumpdir. //If repl status of target is greater than dumps, don't do anything as the load for the latest dump is done - if (!isTargetAlreadyLoaded) { + if (loadPath != null) { DumpMetaData dmd = new DumpMetaData(loadPath, conf); boolean evDump = false; @@ -429,36 +428,20 @@ private Path getCurrentLoadPath() throws IOException, SemanticException { } FileStatus[] statuses = loadPathBase.getFileSystem(conf).listStatus(loadPathBase); if (statuses.length > 0) { - //sort based on last modified. Recent one is at the end + //sort based on last modified. Recent one is at the beginning Arrays.sort(statuses, new Comparator() { public int compare(FileStatus f1, FileStatus f2) { - return Long.compare(f1.getModificationTime(), f2.getModificationTime()); + return Long.compare(f2.getModificationTime(), f1.getModificationTime()); } }); - if (replScope.getDbName() != null) { - String currentReplStatusOfTarget - = getReplStatus(replScope.getDbName()); - if (currentReplStatusOfTarget == null) { //bootstrap - return new Path(statuses[0].getPath(), ReplUtils.REPL_HIVE_BASE_DIR); - } else { - DumpMetaData latestDump = new DumpMetaData( - new Path(statuses[statuses.length - 1].getPath(), ReplUtils.REPL_HIVE_BASE_DIR), conf); - if (Long.parseLong(currentReplStatusOfTarget.trim()) >= latestDump.getEventTo()) { - isTargetAlreadyLoaded = true; - } else { - for (FileStatus status : statuses) { - Path hiveLoadPath = new Path(status.getPath(), ReplUtils.REPL_HIVE_BASE_DIR); - DumpMetaData dmd = new DumpMetaData(hiveLoadPath, conf); - if (dmd.isIncrementalDump() - && Long.parseLong(currentReplStatusOfTarget.trim()) < dmd.getEventTo()) { - return hiveLoadPath; - } - } + Path hiveDumpPath = new Path(statuses[0].getPath(), ReplUtils.REPL_HIVE_BASE_DIR); + if (loadPathBase.getFileSystem(conf).exists(hiveDumpPath)) { + FileStatus[] dumpStatuses = loadPathBase.getFileSystem(conf).listStatus(hiveDumpPath); + for (FileStatus dumpStatus : dumpStatuses) { + if (dumpStatus.getPath().getName().contains(ReplUtils.DUMP_ACKNOWLEDGEMENT)) { + return hiveDumpPath; } } - } else { - //If dbname is null(in case of repl load *), can't get repl status of target, return unsupported - throw new UnsupportedOperationException("REPL LOAD * is not supported"); } } return null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 6f8912b5f9..7030451c0f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -82,6 +82,16 @@ public static void writeOutput(List> listValues, Path outputFile, H } } + public static void write(Path outputFile, HiveConf hiveConf) + throws SemanticException { + try { + FileSystem fs = outputFile.getFileSystem(hiveConf); + fs.create(outputFile); + } catch (IOException e) { + throw new SemanticException(e); + } + } + public static Iterable matchesDb(Hive db, String dbPattern) throws HiveException { if (dbPattern == null) { return db.getAllDatabases();