diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 3072488388..241a15a357 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -877,6 +877,49 @@ private void verifyIfSrcOfReplPropMissing(Map props) { assertFalse(props.containsKey(SOURCE_OF_REPLICATION)); } + @Test + public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { + WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, null); + + replica.load(replicatedDbName, tuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId); + + tuple = primary.dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId); + + // create events for some other database and then dump the primaryDbName to dump an empty directory. + String testDbName = primaryDbName + "_test"; + tuple = primary.run(" create database " + testDbName) + .run("create table " + testDbName + ".tbl (fld int)") + .dump(primaryDbName, tuple.lastReplicationId); + + // Incremental load to existing database with empty dump directory should set the repl id to the last event at src. + replica.load(replicatedDbName, tuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId); + Database replicaDb = replica.getDatabase(replicatedDbName); + assertTrue(replicaDb.getParameters().get("repl.last.id").equalsIgnoreCase(tuple.lastReplicationId)); + + // Incremental load to non existing db should return database not exist error. + tuple = primary.dump("someJunkDB", tuple.lastReplicationId); + CommandProcessorResponse response = replica.runCommand("REPL LOAD someJunkDB from " + tuple.dumpLocation); + response.getErrorMessage().toLowerCase().contains("org.apache.hadoop.hive.ql.metadata.hiveException: " + + "database does not exist"); + + // Bootstrap load from an empty dump directory should return empty load directory error. + tuple = primary.dump("someJunkDB", null); + response = replica.runCommand("REPL LOAD someJunkDB from " + tuple.dumpLocation); + response.getErrorMessage().toLowerCase().contains("org.apache.hadoop.hive.ql.parse.semanticException:" + + " no data to load in path"); + + primary.run(" drop database if exists " + testDbName + " cascade"); + replica.run(" drop database if exists " + testDbName + " cascade"); + } + @Test public void testIncrementalDumpMultiIteration() throws Throwable { WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java index 4b37c8dd98..5638ace714 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java @@ -44,7 +44,9 @@ public IncrementalLoadEventsIterator(String loadPath, HiveConf conf) throws IOEx FileSystem fs = eventPath.getFileSystem(conf); eventDirs = fs.listStatus(eventPath, EximUtil.getDirectoryFilter(fs)); if ((eventDirs == null) || (eventDirs.length == 0)) { - throw new IllegalArgumentException("No data to load in path " + loadPath); + currentIndex = 0; + numEvents = 0; + return; } // For event dump, each sub-dir is an individual event dump. // We need to guarantee that the directory listing we got is in order of event id. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 2aefb574ca..4a8779ee9e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -70,6 +70,7 @@ private final HiveConf conf; private final ReplLogger replLogger; private static long numIteration; + private String loadPath; public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadPath, IncrementalLoadEventsIterator iterator, HiveConf conf) { @@ -83,6 +84,7 @@ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadP replLogger = new IncrementalLoadLogger(dbName, loadPath, iterator.getNumEvents()); numIteration = 0; replLogger.startLog(); + this.loadPath = loadPath; } public Task build(DriverContext driverContext, Hive hive, Logger log, @@ -95,6 +97,21 @@ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadP this.log.debug("Iteration num " + numIteration); TaskTracker tracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS)); + // if no events are there to replay, then update the last repl id of the database/ table to last event id. + if (!iterator.hasNext()) { + DumpMetaData eventDmd = new DumpMetaData(new Path(loadPath), conf); + if (StringUtils.isEmpty(tableName)) { + taskChainTail = dbUpdateReplStateTask(dbName, eventDmd.getEventTo().toString(), taskChainTail); + this.log.debug("no events to replay, set last repl id of db " + dbName + " to " + + eventDmd.getEventTo().toString()); + } else { + taskChainTail = tableUpdateReplStateTask(dbName, tableName, null, + eventDmd.getEventTo().toString(), taskChainTail); + this.log.debug("no events to replay, set last repl id of table " + dbName + "." + tableName + " to " + + eventDmd.getEventTo().toString()); + } + } + while (iterator.hasNext() && tracker.canAddMoreTasks()) { FileStatus dir = iterator.next(); String location = dir.getPath().toUri().toString(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index eea3ae8bae..14416fed84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -377,7 +377,6 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(fs, loadPath); if (srcs == null || (srcs.length == 0)) { LOG.warn("Nothing to load at {}", loadPath.toUri().toString()); - return; } ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,