Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
Description
Briefly describe the process of sending clientRequest on the Kafka Client side, which is divided into two steps.
1.Selector.send(send) method
Kafka's underlying tcp connection channel ensures that data is sent to the network sequentially. KafkaChannel allows only one send to be set at a time. And the next InFlightRequest is allowed to be added only if the queue.peekFirst().send.completed() condition is met.
NetworkClient.isReady(node) -> NetworkClient.canSendRequest(node) -> InFlightRequests.canSendMore(node)
2. Selector.poll(timeout)
After KafkaChannel sets a send each time, there should be a Selector.poll(timeout) call subsequently. Please refer to the comments on the Selector.send(send) method.
/** * Queue the given request for sending in the subsequent {@link #poll(long)} calls * @param send The request to send */ public void send(NetworkSend send) {
Send may become completed only after the Selector.poll(timeout) method is executed, more detail see Selector.write(channel) methos.
Let's go back and look at this method: ConsumerNetworkClient.poll(Timer timer, PollCondition pollCondition, boolean disableWakeup) method.
There are three places involved in sending data in this method:
long pollDelayMs = trySend(timer.currentTimeMs());
->
client.poll(...)
->
trySend(timer.currentTimeMs());
There are two problems with this process:
1. After calling the trySend(...) method for the second time, we should immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the send generated each time can be consumed by the next Selector.poll() method.
2. The while loop in trySend(...) method can be removed
After a node executes client.send(request, now) for the first time, because the first request will never be completed here, the subsequent requests will never satisfy the client.ready(node, now) condition.
Although the current code will break directly on the second execution of the loop, there will be an additional execution of the loop.
Modify the code as follows:
long trySend(long now) { long pollDelayMs = maxPollTimeoutMs; // send any requests that can be sent now for (Node node : unsent.nodes()) { Iterator<ClientRequest> iterator = unsent.requestIterator(node); if (iterator.hasNext()) { pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now)); if (client.ready(node, now)) { client.send(iterator.next(), now); iterator.remove(); } } } return pollDelayMs; }
3. By the way, the unsent.clean() method that is executed last can also be optimized.
Easier to read the code.
public void clean() { // the lock protects removal from a concurrent put which could otherwise mutate the // queue after it has been removed from the map synchronized (unsent) { unsent.values().removeIf(ConcurrentLinkedQueue::isEmpty); } }
Attachments
Issue Links
- links to