KAFKA-5503, we have added a check (https://github.com/apache/kafka/pull/5881) for `running` flag in the loop inside maybeWaitForProducerId. This is to handle concurrent call to Sender close(), while we attempt to get the ProducerId.
This avoids blocking indefinitely when the producer is shutting down.
This created a corner case, where Sender thread gets blocked, if we had concurrent producerId reset and call to Sender thread close. The proposed fix is to check the forceClose flag in the loop inside maybeWaitForProducerId.