Description
The FetcherTest and FetchRequestManagerTest classes have a test named testFetchedRecordsAfterSeek, which upon closer inspection may reveal some incorrect logic in FetchCollector.handleInitializeErrors.
Here is the test code:
@Test public void testFetchedRecordsAfterSeek() { buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED); assignFromUser(singleton(tp0)); // Step 1: seek to offset 0 of our partition. subscriptions.seek(tp0, 0); // Step 2: issue a mock broker request to fetch data from the current offset in our local state, // i.e. offset 0. assertTrue(sendFetches() > 0); // Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker. client.prepareResponse(fullFetchResponse(tidp0, records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); // Step 4: process the network I/O to receive the response from the broker with the OFFSET_OUT_OF_RANGE // that was injected. Note, however, that we haven't "collected" the fetch data included in the response. networkClientDelegate.poll(time.timer(0)); // Step 5: validate that the partition is not marked as needing its offset reset. The response validation // logic is performed during the fetch collection, which doesn't happen until assertEmptyFetch below. assertFalse(subscriptions.isOffsetResetNeeded(tp0)); // Step 6: update the partition's position in our local state to offset 2. We still haven't collected the // fetch, so we haven't performed any validation of the fetch response. subscriptions.seek(tp0, 2); // Step 7: perform the fetch collection. As part of that process, error handling is performed. Since // we intentionally injected an error above, this error will be checked and handled in the // FetchCollector.handleInitializeErrors method. When handling OFFSET_OUT_OF_RANGE, handleInitializeErrors // will notice that the original requested offset (0) is different from the state of our current offset (2). assertEmptyFetch("Should not return records or advance position after seeking to end of topic partition"); }
Here is the code from FetchCollector.handleInitializeErrors:
private void handleInitializeErrors(final CompletedFetch completedFetch, final Errors error) { final TopicPartition tp = completedFetch.partition; final long fetchOffset = completedFetch.nextFetchOffset(); . . . if (error == Errors.OFFSET_OUT_OF_RANGE) { Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp); if (!clearedReplicaId.isPresent()) { // If there's no preferred replica to clear, we're fetching from the leader so handle // this error normally SubscriptionState.FetchPosition position = subscriptions.position(tp); if (position == null || fetchOffset != position.offset) { log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " + "does not match the current offset {}", tp, fetchOffset, position); } else { String errorMessage = "Fetch position " + position + " is out of range for partition " + tp; if (subscriptions.hasDefaultOffsetResetPolicy()) { log.info("{}, resetting offset", errorMessage); subscriptions.requestOffsetReset(tp); } else { log.info("{}, raising error to the application since no reset policy is configured", errorMessage); throw new OffsetOutOfRangeException(errorMessage, Collections.singletonMap(tp, position.offset)); } } } else { log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", clearedReplicaId.get(), tp, error, fetchOffset); } } . . . }
The question is: why is the OFFSET_OUT_OF_RANGE error ignored just because of the following code?
if (position == null || fetchOffset != position.offset) {
It's a bit weird that the above check is only done for the OFFSET_OUT_OF_RANGE error, instead of any error.