diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index c9092b1..4a55e90 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -1013,6 +1013,71 @@ public void testViewsReplication() throws IOException { } @Test + public void testDumpLimit() throws IOException { + String testName = "dumpLimit"; + LOG.info("Testing " + testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + + String[] unptn_data = new String[] { "eleven", "twelve", "thirteen" }; + String[] unptn_data_load1 = new String[] { "eleven" }; + String[] unptn_data_load2 = new String[] { "eleven", "twelve" }; + + // 3 events to insert, last repl ID: replDumpId+3 + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')"); + // 3 events to insert, last repl ID: replDumpId+6 + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')"); + // 3 events to insert, last repl ID: replDumpId+9 + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[2] + "')"); + verifyRun("SELECT a from " + dbName + ".unptned", unptn_data); + + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT 3"); + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + ".unptned", unptn_data); + verifyRun("SELECT a from " + dbName + "_dupe.unptned", unptn_data_load1); + + advanceDumpDir(); + Integer lastReplID = Integer.valueOf(replDumpId); + lastReplID += 1000; + String toReplID = String.valueOf(lastReplID); + + run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + toReplID + " LIMIT 3"); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + + verifyRun("SELECT a from " + dbName + "_dupe.unptned", unptn_data_load2); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + + verifyRun("SELECT a from " + dbName + "_dupe.unptned", unptn_data); + } + + @Test public void testStatus() throws IOException { // first test ReplStateMap functionality Map cmap = new ReplStateMap(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 011df19..be3fc99 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -366,6 +366,8 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { if (eventTo == null){ eventTo = db.getMSC().getCurrentNotificationEventId().getEventId(); LOG.debug("eventTo not specified, using current event id : {}", eventTo); + } else if (eventTo < eventFrom) { + throw new Exception("Invalid event ID input received in TO clause"); } Integer maxRange = Ints.checkedCast(eventTo - eventFrom + 1); @@ -390,20 +392,23 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( evFetcher, eventFrom, maxEventLimit, evFilter); + lastReplId = eventTo; while (evIter.hasNext()){ NotificationEvent ev = evIter.next(); - Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId())); + lastReplId = ev.getEventId(); + Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); dumpEvent(ev, evRoot, cmRoot); } + //Set the current last repl ID + eventTo = lastReplId; + LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), eventTo); writeOutput( Arrays.asList("incremental", String.valueOf(eventFrom), String.valueOf(eventTo)), dmd.getDumpFilePath()); dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, eventTo, cmRoot); dmd.write(); - // Set the correct last repl id to return to the user - lastReplId = eventTo; } prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema); setFetchTask(createFetchTask(dumpSchema));