diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index fd069688e3..774c6905d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -124,20 +124,25 @@ public int execute() { Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() .getBytes(StandardCharsets.UTF_8.name()))); - Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); - DumpMetaData dmd = new DumpMetaData(currentDumpPath, conf); - // Initialize ReplChangeManager instance since we will require it to encode file URI. - ReplChangeManager.getInstance(conf); - Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); - Long lastReplId; - if (!dumpRoot.getFileSystem(conf).exists(dumpRoot) - || dumpRoot.getFileSystem(conf).listStatus(dumpRoot).length == 0) { - lastReplId = bootStrapDump(currentDumpPath, dmd, cmRoot, hiveDb); - } else { - work.setEventFrom(getEventFromPreviousDumpMetadata(dumpRoot)); - lastReplId = incrementalDump(currentDumpPath, dmd, cmRoot, hiveDb); + Path previousDumpMetaPath = getPreviousDumpMetadataPath(dumpRoot); + //If no previous dump is present or previous dump was loaded, proceed + if (previousDumpMetaPath == null || dumpMetadataLoaded(previousDumpMetaPath)) { + Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); + DumpMetaData dmd = new DumpMetaData(currentDumpPath, conf); + // Initialize ReplChangeManager instance since we will require it to encode file URI. + ReplChangeManager.getInstance(conf); + Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); + Long lastReplId; + if (previousDumpMetaPath == null) { + lastReplId = bootStrapDump(currentDumpPath, dmd, cmRoot, hiveDb); + } else { + work.setEventFrom(getEventFromPreviousDumpMetadata(previousDumpMetaPath)); + lastReplId = incrementalDump(currentDumpPath, dmd, cmRoot, hiveDb); + } + prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); + writeDumpCompleteAck(currentDumpPath); + deletePreviousDumpMeta(previousDumpMetaPath); } - prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); } catch (Exception e) { LOG.error("failed", e); setException(e); @@ -146,25 +151,52 @@ public int execute() { return 0; } - private Long getEventFromPreviousDumpMetadata(Path dumpRoot) throws IOException, SemanticException { - FileStatus[] statuses = dumpRoot.getFileSystem(conf).listStatus(dumpRoot); - if (statuses.length > 0) { - FileStatus latestUpdatedStatus = statuses[0]; - for (FileStatus status : statuses) { - if (status.getModificationTime() > latestUpdatedStatus.getModificationTime()) { - latestUpdatedStatus = status; - } - } - DumpMetaData dmd = new DumpMetaData(latestUpdatedStatus.getPath(), conf); + private void deletePreviousDumpMeta(Path previousDumpMetaPath) throws IOException { + if (previousDumpMetaPath != null) { + FileSystem fs = previousDumpMetaPath.getFileSystem(conf); + fs.delete(previousDumpMetaPath, true); + } + } + + private void writeDumpCompleteAck(Path currentDumpPath) throws SemanticException { + Path ackPath = new Path(currentDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT); + Utils.write(ackPath, conf); + } + + private Long getEventFromPreviousDumpMetadata(Path previousDumpPath) throws SemanticException { + if (previousDumpPath != null) { + DumpMetaData dmd = new DumpMetaData(previousDumpPath, conf); if (dmd.isIncrementalDump()) { return dmd.getEventTo(); } - //bootstrap case return event from + //bootstrap case return event from return dmd.getEventFrom(); } return 0L; } + private Path getPreviousDumpMetadataPath(Path dumpRoot) throws IOException { + FileSystem fs = dumpRoot.getFileSystem(conf); + FileStatus[] statuses = fs.listStatus(dumpRoot); + if (statuses.length == 1) { + return statuses[0].getPath(); + } + return null; + } + + private boolean dumpMetadataLoaded(Path previousDumpPath) throws IOException { + if (previousDumpPath != null) { + FileSystem fs = previousDumpPath.getFileSystem(conf); + FileStatus[] latestUpdateStatuses = fs.listStatus(previousDumpPath); + for (FileStatus status : latestUpdateStatuses) { + if (status.getPath().getName().equalsIgnoreCase(ReplUtils.LOAD_ACKNOWLEDGEMENT)) { + return true; + } + } + } + return false; + } + private void prepareReturnValues(List values) throws SemanticException { LOG.debug("prepareReturnValues : " + dumpSchema); for (String s : values) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index a2c467bafd..39bc83fe03 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.plan.api.StageType; import java.io.IOException; @@ -97,19 +98,33 @@ public StageType getType() { @Override public int execute() { - Task rootTask = work.getRootTask(); - if (rootTask != null) { - rootTask.setChildTasks(null); - } - work.setRootTask(this); - this.parentTasks = null; - if (work.isIncrementalLoad()) { - return executeIncrementalLoad(); - } else { - return executeBootStrapLoad(); + try { + Task rootTask = work.getRootTask(); + if (rootTask != null) { + rootTask.setChildTasks(null); + } + work.setRootTask(this); + this.parentTasks = null; + int status = 0; + if (work.isIncrementalLoad()) { + status = executeIncrementalLoad(); + } else { + status = executeBootStrapLoad(); + } + writeLoadCompleteAck(work.dumpDirectory); + return status; + } catch (SemanticException e) { + LOG.error("failed", e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } } + private void writeLoadCompleteAck(String dumpDirectory) throws SemanticException { + Path ackPath = new Path(dumpDirectory, ReplUtils.LOAD_ACKNOWLEDGEMENT); + Utils.write(ackPath, conf); + } + private int executeBootStrapLoad() { try { int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 504c9d4048..f4363395a4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -94,7 +94,10 @@ // Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be // seen in production or in case of tests other than the ones where it's required. public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables"; - + //Acknoledgement for repl dump complete + public static final String DUMP_ACKNOWLEDGEMENT = "_finished_dump"; + //Acknoledgement for repl load complete + public static final String LOAD_ACKNOWLEDGEMENT = "_finished_load"; /** * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 703eb1159c..0e920c839d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -82,7 +83,6 @@ // By default, this will be same as that of super class BaseSemanticAnalyzer. But need to obtain again // if the Hive configs are received from WITH clause in REPL LOAD or REPL STATUS commands. private Hive db; - private boolean isTargetAlreadyLoaded; private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; @@ -389,7 +389,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // tells us what is inside that dumpdir. //If repl status of target is greater than dumps, don't do anything as the load for the latest dump is done - if (!isTargetAlreadyLoaded) { + if (loadPath != null) { DumpMetaData dmd = new DumpMetaData(loadPath, conf); boolean evDump = false; @@ -428,34 +428,17 @@ private Path getCurrentLoadPath() throws IOException, SemanticException { } FileStatus[] statuses = loadPathBase.getFileSystem(conf).listStatus(loadPathBase); if (statuses.length > 0) { - //sort based on last modified. Recent one is at the end + //sort based on last modified. Recent one is at the beginning Arrays.sort(statuses, new Comparator() { public int compare(FileStatus f1, FileStatus f2) { - return Long.compare(f1.getModificationTime(), f2.getModificationTime()); + return Long.compare(f2.getModificationTime(), f1.getModificationTime()); } }); - if (replScope.getDbName() != null) { - String currentReplStatusOfTarget - = getReplStatus(replScope.getDbName()); - if (currentReplStatusOfTarget == null) { //bootstrap - return statuses[0].getPath(); - } else { - DumpMetaData latestDump = new DumpMetaData(statuses[statuses.length - 1].getPath(), conf); - if (Long.parseLong(currentReplStatusOfTarget.trim()) >= latestDump.getEventTo()) { - isTargetAlreadyLoaded = true; - } else { - for (FileStatus status : statuses) { - DumpMetaData dmd = new DumpMetaData(status.getPath(), conf); - if (dmd.isIncrementalDump() - && Long.parseLong(currentReplStatusOfTarget.trim()) < dmd.getEventTo()) { - return status.getPath(); - } - } - } + FileStatus[] dumpStatuses = loadPathBase.getFileSystem(conf).listStatus(statuses[0].getPath()); + for (FileStatus dumpStatus : dumpStatuses) { + if (dumpStatus.getPath().getName().contains(ReplUtils.DUMP_ACKNOWLEDGEMENT)) { + return dumpStatus.getPath().getParent(); } - } else { - //If dbname is null(in case of repl load *), can't get repl status of target, return unsupported - throw new UnsupportedOperationException("REPL LOAD * is not supported"); } } return null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 6f8912b5f9..7030451c0f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -82,6 +82,16 @@ public static void writeOutput(List> listValues, Path outputFile, H } } + public static void write(Path outputFile, HiveConf hiveConf) + throws SemanticException { + try { + FileSystem fs = outputFile.getFileSystem(hiveConf); + fs.create(outputFile); + } catch (IOException e) { + throw new SemanticException(e); + } + } + public static Iterable matchesDb(Hive db, String dbPattern) throws HiveException { if (dbPattern == null) { return db.getAllDatabases();