diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index b59833d..8b33b78 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; +import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsUtils; @@ -764,11 +765,65 @@ public void testIncrementalLoadWithVariableLengthEventId() throws IOException, T run("TRUNCATE TABLE " + dbName + ".unptned", driver); run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); - // Inject a behaviour where all events will get ID less than 100 except TRUNCATE which will get ID 100. - // This enesures variable length of event ID in the incremental dump - BehaviourInjection eventIdModifier + Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); + String incrementalDumpLocn = incrementalDump.dumpLocation; + replDumpId = incrementalDump.lastReplId; + + // Rename the event directories such a way that the length varies. + // We will encounter create_table, truncate followed by insert. + // For the insert, set the event ID longer such that old comparator picks insert before truncate + // Eg: Event IDs CREATE_TABLE - 5, TRUNCATE - 9, INSERT - 12 changed to + // CREATE_TABLE - 5, TRUNCATE - 9, INSERT - 100 + // But if TRUNCATE have ID-10, then having INSERT-100 won't be sufficient to test the scenario. + // So, we set any event comes after CREATE_TABLE starts with 20. + // Eg: Event IDs CREATE_TABLE - 5, TRUNCATE - 10, INSERT - 12 changed to + // CREATE_TABLE - 5, TRUNCATE - 20(20 <= Id < 100), INSERT - 100 + Path dumpPath = new Path(incrementalDumpLocn); + FileSystem fs = dumpPath.getFileSystem(hconf); + FileStatus[] dirsInLoadPath = fs.listStatus(dumpPath, EximUtil.getDirectoryFilter(fs)); + Arrays.sort(dirsInLoadPath, new EventDumpDirComparator()); + long nextEventId = 0; + for (FileStatus dir : dirsInLoadPath) { + Path srcPath = dir.getPath(); + if (nextEventId == 0) { + nextEventId = (long) Math.pow(10.0, (double) srcPath.getName().length()) * 2; + continue; + } + Path destPath = new Path(srcPath.getParent(), String.valueOf(nextEventId)); + fs.rename(srcPath, destPath); + LOG.info("Renamed eventDir {} to {}", srcPath.getName(), destPath.getName()); + // Once the eventId reaches 5-20-100, then just increment it sequentially. This is to avoid longer values. + if (String.valueOf(nextEventId).length() - srcPath.getName().length() >= 2) { + nextEventId++; + } else { + nextEventId = (long) Math.pow(10.0, (double) String.valueOf(nextEventId).length()); + } + } + + // Load from modified dump event directories. + run("REPL LOAD " + replDbName + " FROM '" + incrementalDumpLocn + "'", driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); + } + + @Test + public void testIncrementalReplWithEventsMissing() throws IOException, TException { + String testName = "incrementalReplWithEventsMissing"; + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; + + // CREATE_TABLE - INSERT - TRUNCATE - INSERT - The result is just one record. + String[] unptn_data = new String[]{ "eleven" }; + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("INSERT INTO TABLE " + dbName + ".unptned values('ten')", driver); + run("TRUNCATE TABLE " + dbName + ".unptned", driver); + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); + + // Inject a behaviour where some events missing from notification_log table. + // This ensures the incremental dump doesn't get all events for replication. + BehaviourInjection eventIdSkipper = new BehaviourInjection(){ - private long nextEventId = 0; // Initialize to 0 as for increment dump, 0 won't be used. @Nullable @Override @@ -779,73 +834,27 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event for (int i = 0; i < eventIds.size(); i++) { NotificationEvent event = eventIds.get(i); - // Skip all the events belong to other DBs/tables. - if (event.getDbName().equalsIgnoreCase(dbName)) { - // We will encounter create_table, truncate followed by insert. - // For the insert, set the event ID longer such that old comparator picks insert before truncate - // Eg: Event IDs CREATE_TABLE - 5, TRUNCATE - 9, INSERT - 12 changed to - // CREATE_TABLE - 5, TRUNCATE - 9, INSERT - 100 - // But if TRUNCATE have ID-10, then having INSERT-100 won't be sufficient to test the scenario. - // So, we set any event comes after CREATE_TABLE starts with 20. - // Eg: Event IDs CREATE_TABLE - 5, TRUNCATE - 10, INSERT - 12 changed to - // CREATE_TABLE - 5, TRUNCATE - 20(20 <= Id < 100), INSERT - 100 - switch (event.getEventType()) { - case "CREATE_TABLE": { - // The next ID is set to 20 or 200 or 2000 ... based on length of current event ID - // This is done to ensure TRUNCATE doesn't get an ID 10 or 100... - nextEventId = (long) Math.pow(10.0, (double) String.valueOf(event.getEventId()).length()) * 2; - break; - } - case "INSERT": { - // INSERT will come always after CREATE_TABLE, TRUNCATE. So, no need to validate nextEventId - nextEventId = (long) Math.pow(10.0, (double) String.valueOf(nextEventId).length()); - LOG.info("Changed EventId #{} to #{}", event.getEventId(), nextEventId); - event.setEventId(nextEventId++); - break; - } - default: { - // After CREATE_TABLE all the events in this DB should get an ID >= 20 or 200 ... - if (nextEventId > 0) { - LOG.info("Changed EventId #{} to #{}", event.getEventId(), nextEventId); - event.setEventId(nextEventId++); - } - break; - } - } - - outEventIds.add(event); + // Skip all the INSERT events + if (event.getDbName().equalsIgnoreCase(dbName) && event.getEventType().equalsIgnoreCase("INSERT")) { + injectionPathCalled = true; + continue; } + outEventIds.add(event); } - injectionPathCalled = true; - if (outEventIds.isEmpty()) { - return eventIdList; // If not even one event belongs to current DB, then return original one itself. - } else { - // If the new list is not empty (input list have some events from this DB), then return it - return new NotificationEventResponse(outEventIds); - } + + // Return the new list + return new NotificationEventResponse(outEventIds); } else { return null; } } }; - InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdModifier); - - // It is possible that currentNotificationEventID from metastore is less than newly set event ID by stub function. - // In this case, REPL DUMP will skip events beyond this upper limit. - // So, to avoid this failure, we will set the TO clause to ID 100 times of currentNotificationEventID - String cmd = "REPL DUMP " + dbName + " FROM " + replDumpId - + " TO " + String.valueOf(metaStoreClient.getCurrentNotificationEventId().getEventId()*100); + InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdSkipper); advanceDumpDir(); - run(cmd, driver); - eventIdModifier.assertInjectionsPerformed(true,false); + verifyFail("REPL DUMP " + dbName + " FROM " + replDumpId, driver); + eventIdSkipper.assertInjectionsPerformed(true,false); InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour - - String incrementalDumpLocn = getResult(0, 0, driver); - String incrementalDumpId = getResult(0, 1, true, driver); - LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); - run("REPL LOAD " + replDbName + " FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); } @Test diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 7ba053d..cf1b286 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -2587,19 +2587,25 @@ public NotificationEventResponse getNextNotification(long lastEventId, int maxEv rqst.setMaxEvents(maxEvents); NotificationEventResponse rsp = client.get_next_notification(rqst); LOG.debug("Got back " + rsp.getEventsSize() + " events"); - if (filter == null) { - return rsp; - } else { - NotificationEventResponse filtered = new NotificationEventResponse(); - if (rsp != null && rsp.getEvents() != null) { - for (NotificationEvent e : rsp.getEvents()) { - if (filter.accept(e)) { - filtered.addToEvents(e); - } + NotificationEventResponse filtered = new NotificationEventResponse(); + if (rsp != null && rsp.getEvents() != null) { + long nextEventId = lastEventId + 1; + for (NotificationEvent e : rsp.getEvents()) { + if (e.getEventId() != nextEventId) { + LOG.error("Requested events are found missing in NOTIFICATION_LOG table. Expected: {}, Actual: {}. " + + "Probably, cleaner would've cleaned it up. " + + "Try setting higher value for hive.metastore.event.db.listener.timetolive. " + + "Also, bootstrap the system again to get back the consistent replicated state.", + nextEventId, e.getEventId()); + throw new IllegalStateException("Notification events are missing."); + } + if ((filter != null) && filter.accept(e)) { + filtered.addToEvents(e); } + nextEventId++; } - return filtered; } + return (filter != null) ? filtered : rsp; } @InterfaceAudience.LimitedPrivate({"HCatalog"}) diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java index 74c057b..cb51763 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java @@ -2383,19 +2383,25 @@ public NotificationEventResponse getNextNotification(long lastEventId, int maxEv rqst.setMaxEvents(maxEvents); NotificationEventResponse rsp = client.get_next_notification(rqst); LOG.debug("Got back " + rsp.getEventsSize() + " events"); - if (filter == null) { - return rsp; - } else { - NotificationEventResponse filtered = new NotificationEventResponse(); - if (rsp != null && rsp.getEvents() != null) { - for (NotificationEvent e : rsp.getEvents()) { - if (filter.accept(e)) { - filtered.addToEvents(e); - } + NotificationEventResponse filtered = new NotificationEventResponse(); + if (rsp != null && rsp.getEvents() != null) { + long nextEventId = lastEventId + 1; + for (NotificationEvent e : rsp.getEvents()) { + if (e.getEventId() != nextEventId) { + LOG.error("Requested events are found missing in NOTIFICATION_LOG table. Expected: {}, Actual: {}. " + + "Probably, cleaner would've cleaned it up. " + + "Try setting higher value for hive.metastore.event.db.listener.timetolive. " + + "Also, bootstrap the system again to get back the consistent replicated state.", + nextEventId, e.getEventId()); + throw new IllegalStateException("Notification events are missing."); + } + if ((filter != null) && filter.accept(e)) { + filtered.addToEvents(e); } + nextEventId++; } - return filtered; } + return (filter != null) ? filtered : rsp; } @InterfaceAudience.LimitedPrivate({"HCatalog"})