diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index fae1a2fa10..fb797ff3e3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -877,6 +877,46 @@ private void verifyIfSrcOfReplPropMissing(Map props) { assertFalse(props.containsKey(SOURCE_OF_REPLICATION)); } + @Test + public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { + WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, null); + + replica.load(replicatedDbName, tuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId); + + tuple = primary.dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId); + + // create events for some other database and then dump the primaryDbName to dump an empty directory. + String testDbName = primaryDbName + "_test"; + tuple = primary.run(" create database " + testDbName) + .run("create table " + testDbName + ".tbl (fld int)") + .dump(primaryDbName, tuple.lastReplicationId); + + // Incremental load to existing database with empty dump directory should set the repl id to the last event at src. + replica.load(replicatedDbName, tuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId); + + // Incremental load to non existing db should return database not exist error. + tuple = primary.dump("someJunkDB", tuple.lastReplicationId); + CommandProcessorResponse response = replica.runCommand("REPL LOAD someJunkDB from " + tuple.dumpLocation); + response.getErrorMessage().toLowerCase().contains("org.apache.hadoop.hive.ql.metadata.hiveException: " + + "database does not exist"); + + // Bootstrap load from an empty dump directory should return empty load directory error. + tuple = primary.dump("someJunkDB", null); + response = replica.runCommand("REPL LOAD someJunkDB from " + tuple.dumpLocation); + response.getErrorMessage().toLowerCase().contains("org.apache.hadoop.hive.ql.parse.semanticException:" + + " no data to load in path"); + + primary.run(" drop database if exists " + testDbName + " cascade"); + } + @Test public void testIncrementalDumpMultiIteration() throws Throwable { WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index fdbcb15c72..ff21b6a601 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -53,7 +53,7 @@ final LineageState sessionStateLineageState; public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, - String tableNameToLoadIn, LineageState lineageState, boolean isIncrementalDump) throws IOException { + String tableNameToLoadIn, LineageState lineageState, boolean isIncrementalDump, Long eventTo) throws IOException { this.tableNameToLoadIn = tableNameToLoadIn; sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; @@ -64,7 +64,7 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoad this.bootstrapIterator = null; this.constraintsIterator = null; incrementalLoad = new IncrementalLoadTasksBuilder(dbNameToLoadIn, tableNameToLoadIn, dumpDirectory, - incrementalIterator, hiveConf); + incrementalIterator, hiveConf, eventTo); } else { this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); @@ -73,11 +73,6 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoad } } - public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern, - LineageState lineageState) throws IOException { - this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState, false); - } - public BootstrapEventsIterator iterator() { return bootstrapIterator; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java index 4b37c8dd98..5638ace714 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java @@ -44,7 +44,9 @@ public IncrementalLoadEventsIterator(String loadPath, HiveConf conf) throws IOEx FileSystem fs = eventPath.getFileSystem(conf); eventDirs = fs.listStatus(eventPath, EximUtil.getDirectoryFilter(fs)); if ((eventDirs == null) || (eventDirs.length == 0)) { - throw new IllegalArgumentException("No data to load in path " + loadPath); + currentIndex = 0; + numEvents = 0; + return; } // For event dump, each sub-dir is an individual event dump. // We need to guarantee that the directory listing we got is in order of event id. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 2aefb574ca..c626047867 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -70,9 +70,10 @@ private final HiveConf conf; private final ReplLogger replLogger; private static long numIteration; + private Long eventTo; public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadPath, - IncrementalLoadEventsIterator iterator, HiveConf conf) { + IncrementalLoadEventsIterator iterator, HiveConf conf, Long eventTo) { this.dbName = dbName; this.tableName = tableName; this.iterator = iterator; @@ -83,6 +84,7 @@ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadP replLogger = new IncrementalLoadLogger(dbName, loadPath, iterator.getNumEvents()); numIteration = 0; replLogger.startLog(); + this.eventTo = eventTo; } public Task build(DriverContext driverContext, Hive hive, Logger log, @@ -95,6 +97,19 @@ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadP this.log.debug("Iteration num " + numIteration); TaskTracker tracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS)); + // if no events are there to replay, then update the last repl id of the database/ table to last event id. + if (!iterator.hasNext()) { + String lastEventid = eventTo.toString(); + if (StringUtils.isEmpty(tableName)) { + taskChainTail = dbUpdateReplStateTask(dbName, lastEventid, taskChainTail); + this.log.debug("no events to replay, set last repl id of db " + dbName + " to " + lastEventid); + } else { + taskChainTail = tableUpdateReplStateTask(dbName, tableName, null, lastEventid, taskChainTail); + this.log.debug("no events to replay, set last repl id of table " + dbName + "." + tableName + " to " + + lastEventid); + } + } + while (iterator.hasNext() && tracker.canAddMoreTasks()) { FileStatus dir = iterator.next(); String location = dir.getPath().toUri().toString(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index e4a128182c..cfeb31a24f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -359,21 +359,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { LOG.debug("{} contains an bootstrap dump", loadPath); } - if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { - ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - tblNameOrPattern, queryState.getLineageState(), false); - rootTasks.add(TaskFactory.get(replLoadWork, conf)); - return; - } - - FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(fs, loadPath); - if (srcs == null || (srcs.length == 0)) { - LOG.warn("Nothing to load at {}", loadPath.toUri().toString()); - return; - } - ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - tblNameOrPattern, queryState.getLineageState(), evDump); + tblNameOrPattern, queryState.getLineageState(), evDump, dmd.getEventTo()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes