Details
Description
How to reproduce
1、kafka-client 3.x.x Producer config enable.idempotence=true (this is default)
2、start kafka server , not contain client user auth info
3、start client producer , after 3.x,producer will initProducerId and TCM state trans to INITIALIZING
4、server reject client reqesut , producer will raise
AuthenticationException (org.apache.kafka.clients.producer.internals.Sender#maybeSendAndPollTransactionalRequest)
5、kafka-client org.apache.kafka.clients.producer.internals.Sender#runOnce catch
AuthenticationException
call transactionManager.authenticationFailed(e);
synchronized void authenticationFailed(AuthenticationException e)
this method only handle pendingRequest,but inflight request is missing
6、 TCM state will alway in INITIALIZING
for judgment Condition: currentState != State.INITIALIZING && !hasProducerId()
7、producer send mesasge , mesasge go into batch queue,Sender will wake up and set pollTimeout=0 , prepare to send message
8、but , before Sender sendProducerData ,it will do message filter ,RecordAccumulator drain {}>drainBatchesForOneNode{}>shouldStopDrainBatchesForPartition
when producerIdAndEpoch.isValid()==false,return true, it will not collect any message
9、now kafka producer network thread CPU usage will go 100%
10、even we add user auth info and permission in kafka server ,it can not self-healing
suggest :
also catch AuthenticationException in org.apache.kafka.clients.producer.internals.Sender#maybeSendAndPollTransactionalRequest and respone failed to inflight InitProducerId request