Description
When the fetch request times out the future is completed from the "raft-expiration-executor" SystemTimer thread. KafkaRaftClient assumes that tryCompleteFetchResponse is always called from the same thread. This invariant is violated in this case.
return future.handle((completionTimeMs, exception) -> { if (exception != null) { Throwable cause = exception instanceof ExecutionException ? exception.getCause() : exception; // If the fetch timed out in purgatory, it means no new data is available, // and we will complete the fetch successfully. Otherwise, if there was // any other error, we need to return it. Errors error = Errors.forException(cause); if (error != Errors.REQUEST_TIMED_OUT) { logger.info("Failed to handle fetch from {} at {} due to {}", replicaId, fetchPartition.fetchOffset(), error); return buildEmptyFetchResponse(error, Optional.empty()); } } // FIXME: `completionTimeMs`, which can be null logger.trace("Completing delayed fetch from {} starting at offset {} at {}", replicaId, fetchPartition.fetchOffset(), completionTimeMs); return tryCompleteFetchRequest(replicaId, fetchPartition, time.milliseconds()); });
One solution is to always build an empty response if the future was completed exceptionally. This works because the ExpirationService completes the future with a `TimeoutException`.
A longer-term solution is to use a more flexible event executor service. This would be a service that allows more kinds of event to get scheduled/submitted to the KRaft thread.
Attachments
Issue Links
- links to