Details
Description
We have a kafka cluster running on version 3.5.2 that we are upgrading to 3.6.1. This cluster has a lot of clients with exactly one semantics enabled and hence creating transactions. As we replaced brokers with the new binaries, we observed lots of clients in the cluster experiencing the following error:
2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=<client>, transactionalId=<transactionalId>] Got error produce response with correlation id 6402937 on topic-partition <topic-partition>, retrying (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The server disconnected before a response was received.
On inspecting the broker, we saw the following errors on brokers still running Kafka version 3.5.2:
message:
Closing socket for <ChannelId> because of error
exception_exception_class:
org.apache.kafka.common.errors.InvalidRequestException
exception_exception_message:
Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
exception_stacktrace:
org.apache.kafka.common.errors.InvalidRequestException: Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
On the new brokers running 3.6.1 we saw the following errors:
[AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for node 1043 with a network exception.
I can also see this :
[AddPartitionsToTxnManager broker=1055]Cancelled in-flight ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 being disconnected (elapsed time since creation: 11ms, elapsed time since send: 4ms, request timeout: 30000ms)
We started investigating this issue and digging through the changes in 3.6, we came across some changes introduced as part of KAFKA-14402 that we thought might lead to this behaviour.
First we could see that transaction.partition.verification.enable is enabled by default and enables a new code path that culminates in we sending version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated here.
From a discussion on the mailing list, jolshan pointed out that this scenario shouldn't be possible as the following code paths should prevent version 4 ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130 https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195
However, these requests are still sent to other brokers in our environment.
On further inspection of the code, I am wondering if the following code path could lead to this issue:
In this scenario, we don't have any NodeApiVersions available for the specified nodeId and potentially skipping the latestUsableVersion check. I am wondering if it is possible that because discoverBrokerVersions is set to false for the network client of the AddPartitionsToTxnManager, it skips fetching NodeApiVersions? I can see that we create the network client here:
The NetworkUtils.buildNetworkClient method seems to create a network client that has discoverBrokerVersions set to false.
I was hoping I could get some assistance debugging this issue. Happy to provide any additional information needed.