Briefly describe the process of sending clientRequest on the Kafka Client side, which is divided into two steps.
Kafka's underlying tcp connection channel ensures that data is sent to the network sequentially. KafkaChannel allows only one send to be set at a time. And the next InFlightRequest is allowed to be added only if the queue.peekFirst().send.completed() condition is met.
After KafkaChannel sets a send each time, there should be a Selector.poll(timeout) call subsequently. Please refer to the comments on the Selector.send(send) method.
Send may become completed only after the Selector.poll(timeout) method is executed, more detail see Selector.write(channel) methos.
Let's go back and look at this method: ConsumerNetworkClient.poll(Timer timer, PollCondition pollCondition, boolean disableWakeup) method.
There are three places involved in sending data in this method:
There are two problems with this process:
1. After calling the trySend(...) method for the second time, we should immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the send generated each time can be consumed by the next Selector.poll() method.
2. The while loop in trySend(...) method can be removed
After a node executes client.send(request, now) for the first time, because the first request will never be completed here, the subsequent requests will never satisfy the client.ready(node, now) condition.
Although the current code will break directly on the second execution of the loop, there will be an additional execution of the loop.
Modify the code as follows：
3. By the way, the unsent.clean() method that is executed last can also be optimized.
Easier to read the code.