diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 22d5876..cfa60fc 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -803,6 +803,9 @@ public void testIncrementalLoadWithVariableLengthEventId() throws IOException, T @Override public NotificationEventResponse apply(@Nullable NotificationEventResponse eventIdList) { if (null != eventIdList) { + if (injectionPathCalled) { + return eventIdList; + } List eventIds = eventIdList.getEvents(); List outEventIds = new ArrayList(); for (int i = 0; i < eventIds.size(); i++) { @@ -878,6 +881,58 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event } @Test + public void testIncrementalReplWithEventsMissing() throws IOException, TException { + String testName = "incrementalReplWithEventsMissing"; + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; + + // CREATE_TABLE - INSERT - TRUNCATE - INSERT - The result is just one record. + String[] unptn_data = new String[]{ "eleven" }; + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("INSERT INTO TABLE " + dbName + ".unptned values('ten')", driver); + run("TRUNCATE TABLE " + dbName + ".unptned", driver); + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); + + // Inject a behaviour where some events missing from notification_log table. + // This ensures the incremental dump doesn't get all events for replication. + BehaviourInjection eventIdSkipper + = new BehaviourInjection(){ + + @Nullable + @Override + public NotificationEventResponse apply(@Nullable NotificationEventResponse eventIdList) { + if (null != eventIdList) { + List eventIds = eventIdList.getEvents(); + List outEventIds = new ArrayList(); + for (int i = 0; i < eventIds.size(); i++) { + NotificationEvent event = eventIds.get(i); + + // Skip the initial 2 events + if (i < 2) { + injectionPathCalled = true; + continue; + } + outEventIds.add(event); + } + + // Return the new list + return new NotificationEventResponse(outEventIds); + } else { + return null; + } + } + }; + InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdSkipper); + + advanceDumpDir(); + verifyFail("REPL DUMP " + dbName + " FROM " + replDumpId, driver); + eventIdSkipper.assertInjectionsPerformed(true, false); + InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour + } + + @Test public void testDrops() throws IOException { String name = testName.getMethodName(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index ce0757c..909fc91 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -136,8 +136,8 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw // factory per event to decode. For now, however, since all messages have the // same factory, restricting by message format is effectively a guard against // older leftover data that would cause us problems. - - work.overrideEventTo(getHive()); + IMetaStoreClient metastoreClient = getHive().getMSC(); + work.overrideEventTo(metastoreClient); IMetaStoreClient.NotificationFilter evFilter = new AndFilter( new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern), @@ -145,7 +145,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat())); EventUtils.MSClientNotificationFetcher evFetcher - = new EventUtils.MSClientNotificationFetcher(getHive().getMSC()); + = new EventUtils.MSClientNotificationFetcher(metastoreClient); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( evFetcher, work.eventFrom, work.maxEventLimit(), evFilter); @@ -164,8 +164,32 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw dumpEvent(ev, evRoot, cmRoot); } - replLogger.endLog(lastReplId.toString()); + // Cleaner thread is concurrently removing the expired events from NOTIFICATION_LOG table. + // So, it is necessary to check if the current dump missed any event while dumping. It is + // guaranteed that cleaner would delete events sequentially based on event id/eventTime. So, it + // is enough to check if the last replicated event is still there in NOTIFICATION_LOG table even + // after dumping all the events. + // Get the last replicated event based on last event id (work.eventFrom) from NOTIFICATION_LOG table. + // EventId starts with 1. So, if from event id is 0, then get event with ID 1. + long expectedLastEventId = (work.eventFrom == 0) ? 1 : work.eventFrom; + long actualLastEventId = 0; + + // getNextNotification gets event == expectedLastEventId + List events = metastoreClient.getNextNotification(expectedLastEventId - 1, 1, null).getEvents(); + if (!events.isEmpty()) { + assert (events.size() == 1); + actualLastEventId = events.get(0).getEventId(); + } + if (actualLastEventId != expectedLastEventId) { + LOG.error("Requested events are found missing in NOTIFICATION_LOG table. Expected: {}, Actual: {}. " + + "Probably, cleaner would've cleaned it up. " + + "Try setting higher value for hive.metastore.event.db.listener.timetolive. " + + "Also, bootstrap the system again to get back the consistent replicated state.", + expectedLastEventId, actualLastEventId); + throw new IllegalStateException("Notification events are missing."); + } + replLogger.endLog(lastReplId.toString()); LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); Utils.writeOutput( Arrays.asList( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 323c73d..c58c25d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl; import com.google.common.primitives.Ints; -import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.plan.Explain; import org.slf4j.LoggerFactory; @@ -65,9 +65,9 @@ int maxEventLimit() throws Exception { return maxEventLimit; } - void overrideEventTo(Hive fromDb) throws Exception { + void overrideEventTo(IMetaStoreClient metastoreClient) throws Exception { if (eventTo == null) { - eventTo = fromDb.getMSC().getCurrentNotificationEventId().getEventId(); + eventTo = metastoreClient.getCurrentNotificationEventId().getEventId(); LoggerFactory.getLogger(this.getClass()) .debug("eventTo not specified, using current event id : {}", eventTo); }