Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13829

The function of max.in.flight.requests.per.connection parameter does not work, it conflicts with the underlying NIO sending data

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • clients
    • None

    Description

      Due to the implementation mechanism of the `OP_WRITE` event of Kafka's underlying NIO, the function of the `max.in.flight.requests.per.connection` parameter does not work. This will greatly affect Kafka's network sending performance.

      The process of Kafka's Selector sending ClientRequest can be simply divided into the following two major steps:

      1) Prepare the request data to be sent

      1. NetworkClient.ready(Node node, long now) -> NetworkClient.canSendRequest(...) method is called: determine whether the request is eligible to be sent.

      2.NetworkClient.doSend(...): Construct `Send send` and cache the request by inFlightRequests.add(inFlightRequest).

      3. Execute the `KafkaChannel.setSend()` method:
      Judging that `this.send` must be null at present, otherwise an IllegalStateException is thrown;
      Set `this.send` value;
      transportLayer adds `OP_WRITE` event.

       

      2) Selector.poll(long timeout) should then be called to consume the send and send data to the network.

       

      Selector.poll() -> this.nioSelector.select(timeout)
      Selector.pollSelectionKeys() -> 
      Selector.attemptWrite() -> 
      Selector.write(channel) -> 
      KafkaChannel.write() & KafkaChannel.maybeCompleteSend()

       

      1.Selector.poll -> this.nioSelector.select(timeout) : Get the previously registered `OP_WRITE` event

      2. Execute the `Selector.attemptWrite()` method

      3. In the `KafkaChannel.write()` method, after the data is successfully sent, update `this.send` to the completed state.

      4. In the `KafkaChannel.maybeCompleteSend()` method, check whether send is in the completed state, otherwise do nothing.

      5. If send is completed, transportLayer removes the `OP_WRITE` event and resets the KafkaChannel.send object to null.

      6. Wait for the next request data that is ready to be sent.

       

      It seems that there is no problem with the whole process above, but carefully read the method of NetworkClient.canSendRequest(...), there is such a condition in inFlightRequests.canSendMore(node):

      queue.peekFirst().send.completed()

      Secondly, the `inFlightRequests.add(inFlightRequest)` method also calls addFirst(request).

       

      Currently, only one send object is stored in KafkaChannel, not a sendObject collection;
      During OP_WRITE event registration and removal,  only one send object  will be sent in the KafkaChannel.write() method and only one send object  will be completed in the KafkaChannel.maybeCompleteSend() method.

      So whether the clientRequest is eligible to be sent will only be limited by queue.peekFirst().send.completed(), the max.in.flight.requests.per.connection parameter will lose its effect, and the effect of setting greater than 1 is equivalent in 1.

       

      Suggest

      Due to the KafkaClient architecture, we do not need to consider the concurrent execution of multiple threads of the `NetworkClient.poll` method.

      1.NetworkClient.canSendRequest(...) removes the condition,: 

      queue.peekFirst().send.completed()

      `max.in.flight.requests.per.connection` parameter will work again.

      2. Before, in KafkaChannel.setSend(), register the `OP_WRITE` event, and in KafkaChannel.maybeCompleteSend(), remove the `OP_WRITE` event.
      It may no longer be appropriate now. Because we want to send more than one send object data during the registration and removal of `OP_WRITE` events, it is recommended not to repeatedly register and remove `OP_WRITE` events, but choose to register the `OP_WRITE` event in the `transportLayer.finishConnect()` method:

      key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);

      3. The KafkaChannel.setSend() method does not only cache an incoming send object, but the incoming send object is stored in the sendCollection structure.

      4. When KafkaChannel.write() is executed, copy sendCollection to midWriteSendCollection, then clear sendCollection, and finally send all the data in midWriteSendCollection.

      5. In the KafkaChannel.maybeCompleteSend() method, determine whether there is a completed state send in the midWriteSendCollection, and then remove all completedSends from the midWriteSendCollection, and return completedSends for adding into Selector.completedSends.

      6. Selector.attemptRead(channel) method execution has preconditions: `!hasCompletedReceive(channel)`, so the KafkaChannel.receive object does not need to be changed.

      7. A little extra:
      ConsumerNetworkClient.poll(Timer timer, PollCondition pollCondition, boolean disableWakeup) method.
      There are three places involved in sending data in this method:

      long pollDelayMs = trySend(timer.currentTimeMs());
      ->
       client.poll(...)
      ->
      trySend(timer.currentTimeMs());

      There is a problem with this process:
      when calling the trySend(...) method for the second time, we should immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the send generated each time can be consumed by the next Selector.poll() method.

      Attachments

        Activity

          People

            Unassigned Unassigned
            RivenSun RivenSun
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: