Details
Description
Context
Kafka streams applications send, time after time, DeleteRecords requests, via org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords method. Such requests may involve more than 1 topic (or partition), and such requests are supposed to be sent to partitions' leaders brokers.
Observed behaviour
In case when DeleteRecords request includes more than one topic (let's say 2 - topic1 and topic2), and these topics are led by different brokers (let’s say broker1 and broker2 respectively), the request is sent to only one broker (let’s say broker1), leading to partial not_leader_or_follower errors. As not the whole request was successful (topic1 is fine, but topic2 is not), it gets retried, with the same arguments, to the same broker (broker1), meaning the response will be partially faulty again and again. It also may (and does) happen that there is a “mirrored” half-faulty request - in this case, to broker2, where topic2 operation is successful, but topic1 operation fails.
Here’s an anonymised logs example from a production system (“direct” and “mirrored” requests, one after another):
[AdminClient clientId=worker-admin] Sending DeleteRecordsRequestData(topics=[ DeleteRecordsTopic( name='topic1', partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] ), DeleteRecordsTopic( name='topic2', partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] )], timeoutMs=60000) to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1 correlationId=42003907, timeoutMs=30000 [AdminClient clientId=worker-admin] Sending DeleteRecordsRequestData(topics=[ DeleteRecordsTopic( name='topic1', partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] ), DeleteRecordsTopic( name='topic2', partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] )], timeoutMs=60000) to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2 correlationId=42003906, timeoutMs=30000
Such request results in the following response (in this case, only for the "direct" response):
[AdminClient clientId=worker-admin] Call( callName=deleteRecords(api=DELETE_RECORDS), deadlineMs=..., tries=..., // Can be hundreds nextAllowedTryMs=...) got response DeleteRecordsResponseData( throttleTimeMs=0, topics=[ DeleteRecordsTopicResult( name='topic2', partitions=[DeleteRecordsPartitionResult( partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the errorCode 6, which is not_leader_or_follower DeleteRecordsTopicResult( name='topic1', partitions=[DeleteRecordsPartitionResult( partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the errorCode 0, which means the operation was successful ] )
Expected behaviour
DeleteRecords requests are sent to corresponding partitions' leaders brokers when more than 1 topic/partition is involved and they are led by different brokers.
Notes
- presumably, introduced in 3.6.1 via https://github.com/apache/kafka/pull/13760 .
Attachments
Issue Links
- links to