Kafka
  1. Kafka
  2. KAFKA-200

Support configurable send / receive socket buffer size in server

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.7
    • Fix Version/s: 0.7.1
    • Component/s: core
    • Labels:
      None

      Description

      • Make the send / receive socket buffer size configurable in server.
      • KafkaConfig.scala already has the following existing variables to support send / receive buffer:
        socketSendBuffer
        socketReceiveBuffer
      • The patch attached to this ticket will read the following existing settings in <kafka>/config/server.properties and set the corresponding socket buffers
        . . .
      1. The send buffer (SO_SNDBUF) used by the socket server
        socket.send.buffer=1048576
      2. The receive buffer (SO_RCVBUF) used by the socket server
        socket.receive.buffer=1048576
      1. KAFKA-200-v3.patch
        5 kB
        John Fung
      2. KAFKA-200.patch
        4 kB
        John Fung

        Activity

        Hide
        Jun Rao added a comment -

        Thanks, John. Just committed this patch.

        Show
        Jun Rao added a comment - Thanks, John. Just committed this patch.
        Hide
        John Fung added a comment -

        The KAFKA-200-v3.patch contains the following changes:

        1. Updated ConsoleConsumer to take the argument --socket-buffer-size
        2. Implemented Approach 3 as mentioned above. But the code is changed by calling configBlocking(false) after accept( ). Otherwise, it won't work properly.

        Show
        John Fung added a comment - The KAFKA-200 -v3.patch contains the following changes: 1. Updated ConsoleConsumer to take the argument --socket-buffer-size 2. Implemented Approach 3 as mentioned above. But the code is changed by calling configBlocking(false) after accept( ). Otherwise, it won't work properly.
        Hide
        Jun Rao added a comment -

        John,

        Thanks for the detailed update. From TCP illustrated section 13.3.3, last paragraph: "The shift count is automatically chosen by TCP, based on the size of the receive buffer." So, to set a TCP window size larger than 64K, we just need to make sure the receive buffer size is set properly before the connection is established. So, Approach 3 as you listed is the right thing to do. Could you upload a new patch?

        Show
        Jun Rao added a comment - John, Thanks for the detailed update. From TCP illustrated section 13.3.3, last paragraph: "The shift count is automatically chosen by TCP, based on the size of the receive buffer." So, to set a TCP window size larger than 64K, we just need to make sure the receive buffer size is set properly before the connection is established. So, Approach 3 as you listed is the right thing to do. Could you upload a new patch?
        Hide
        John Fung added a comment -

        ===============
        Background Informations
        ===============

        1. Socket.setReceiveBufferSize() has restriction specified in Java API doc : "For client sockets, setReceiveBufferSize() must be called before connecting the socket to its remote peer."

        2. Socket.setSendBufferSize() does not have corresponding restriction as setReceiveBufferSize()

        3. SocketChannel.socket() returns Socket object which is needed to provide both setReceiveBufferSize() and setSendBufferSize() methods (However, SocketChannel is available only after the connection is accepted)

        4. ServerSocketChannel.socket() returns ServerSocket object which provides only setReceiveBufferSize() but no setSendBufferSize() method (ServerSocketChannel is available before the connection is accepted)

        5. Hence, ServerSocketChannel object is available to provide setReceiveBufferSize before calling ServerSocketChannel.accept() which would satisfy the restriction specified in item 1

        The above 5 items are background informations which lead to the following 3 different approaches to set send/receive buffer sizes in Kafka (tested in Linux).

        It appears that Approach 3 is the most appropriate in this scenario.

        ============================
        Approach 1 - KAFKA-200.patch (existing patch)

        1. call setReceiveBufferSize after accept()
        2. call setSendBufferSize after accept()
        ============================

        def accept(key: SelectionKey, processor: Processor)

        { val socketChannel = key.channel().asInstanceOf[ServerSocketChannel].accept() socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setSendBufferSize(sendBufferSize) socketChannel.socket().setReceiveBufferSize(receiveBufferSize) processor.accept(socketChannel) }

        =====
        Results:
        =====
        1. In Linux machine, seeing both send / receive buffer socket sizes set to the expected values.
        2. It doesn't comply with the restriction specified by Java API doc

        ============================
        Approach 2:

        1. call setReceiveBufferSize before accept()
        2. call setSendBufferSize before accept()
        ============================

        def accept(key: SelectionKey, processor: Processor)

        { val socketChannel = key.channel().asInstanceOf[SocketChannel] socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setSendBufferSize(sendBufferSize) socketChannel.socket().setReceiveBufferSize(receiveBufferSize) socketChannel.asInstanceOf[ServerSocketChannel].accept() processor.accept(socketChannel) }

        =====
        Results:
        =====
        1. Compilation OK.
        2. Runtime error for casting:

        [2011-12-15 21:31:52,986] ERROR Error in acceptor (kafka.network.Acceptor)
        java.lang.ClassCastException
        [2011-12-15 21:31:52,986] ERROR Error in acceptor (kafka.network.Acceptor)
        java.lang.ClassCastException
        . . .

        ============================
        Approach 3:

        1. call setReceiveBufferSize before accept()
        (complying with restriction specified in http://docs.oracle.com/javase/6/docs/api/java/net/Socket.html#setReceiveBufferSize%28int%29)

        2. call setSendBufferSize after accept()
        (No corresponding restriction to call before accept() )
        ============================

        def accept(key: SelectionKey, processor: Processor)

        { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] serverSocketChannel.configureBlocking(false) serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize) val socketChannel = serverSocketChannel.accept() socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setSendBufferSize(sendBufferSize) processor.accept(socketChannel) }

        =====
        Results:
        =====
        1. Comply with the restriction specified in Java API doc
        2. Both sendBufferSize and receiveBufferSize are set to the expected values:
        [2011-12-15 21:46:46,525] DEBUG sendBufferSize: [1048576] receiveBufferSize: [1048576] (kafka.network.Acceptor)

        Show
        John Fung added a comment - =============== Background Informations =============== 1. Socket.setReceiveBufferSize() has restriction specified in Java API doc : "For client sockets, setReceiveBufferSize() must be called before connecting the socket to its remote peer." http://docs.oracle.com/javase/6/docs/api/java/net/Socket.html#setReceiveBufferSize%28int%29 2. Socket.setSendBufferSize() does not have corresponding restriction as setReceiveBufferSize() http://docs.oracle.com/javase/6/docs/api/java/net/Socket.html#setSendBufferSize%28int%29 3. SocketChannel.socket() returns Socket object which is needed to provide both setReceiveBufferSize() and setSendBufferSize() methods (However, SocketChannel is available only after the connection is accepted) 4. ServerSocketChannel.socket() returns ServerSocket object which provides only setReceiveBufferSize() but no setSendBufferSize() method (ServerSocketChannel is available before the connection is accepted) 5. Hence, ServerSocketChannel object is available to provide setReceiveBufferSize before calling ServerSocketChannel.accept() which would satisfy the restriction specified in item 1 The above 5 items are background informations which lead to the following 3 different approaches to set send/receive buffer sizes in Kafka (tested in Linux). It appears that Approach 3 is the most appropriate in this scenario. ============================ Approach 1 - KAFKA-200 .patch (existing patch) 1. call setReceiveBufferSize after accept() 2. call setSendBufferSize after accept() ============================ def accept(key: SelectionKey, processor: Processor) { val socketChannel = key.channel().asInstanceOf[ServerSocketChannel].accept() socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setSendBufferSize(sendBufferSize) socketChannel.socket().setReceiveBufferSize(receiveBufferSize) processor.accept(socketChannel) } ===== Results: ===== 1. In Linux machine, seeing both send / receive buffer socket sizes set to the expected values. 2. It doesn't comply with the restriction specified by Java API doc ============================ Approach 2: 1. call setReceiveBufferSize before accept() 2. call setSendBufferSize before accept() ============================ def accept(key: SelectionKey, processor: Processor) { val socketChannel = key.channel().asInstanceOf[SocketChannel] socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setSendBufferSize(sendBufferSize) socketChannel.socket().setReceiveBufferSize(receiveBufferSize) socketChannel.asInstanceOf[ServerSocketChannel].accept() processor.accept(socketChannel) } ===== Results: ===== 1. Compilation OK. 2. Runtime error for casting: [2011-12-15 21:31:52,986] ERROR Error in acceptor (kafka.network.Acceptor) java.lang.ClassCastException [2011-12-15 21:31:52,986] ERROR Error in acceptor (kafka.network.Acceptor) java.lang.ClassCastException . . . ============================ Approach 3: 1. call setReceiveBufferSize before accept() (complying with restriction specified in http://docs.oracle.com/javase/6/docs/api/java/net/Socket.html#setReceiveBufferSize%28int%29 ) 2. call setSendBufferSize after accept() (No corresponding restriction to call before accept() ) ============================ def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] serverSocketChannel.configureBlocking(false) serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize) val socketChannel = serverSocketChannel.accept() socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setSendBufferSize(sendBufferSize) processor.accept(socketChannel) } ===== Results: ===== 1. Comply with the restriction specified in Java API doc 2. Both sendBufferSize and receiveBufferSize are set to the expected values: [2011-12-15 21:46:46,525] DEBUG sendBufferSize: [1048576] receiveBufferSize: [1048576] (kafka.network.Acceptor)
        Hide
        John Fung added a comment -

        Hi Jay,

        I am going to change it to the way recommended by the doc you have posted above.

        Thanks,
        John

        Show
        John Fung added a comment - Hi Jay, I am going to change it to the way recommended by the doc you have posted above. Thanks, John
        Hide
        John Fung added a comment -

        Hi Jay,

        The followings are the debug messages from the throughput test and the set values are effective:

        1. Kafka log messages at the sending side:
        . . .
        [2011-12-13 01:54:57,572] INFO Closing socket connection to /172.17.166.122. (kafka.network.Processor)
        [2011-12-13 01:54:59,390] INFO #### sendBufferSize : set[1048576] get[1048576] (kafka.network.Acceptor)
        [2011-12-13 01:54:59,390] INFO #### sendBufferSize : set[1048576] get[1048576] (kafka.network.Acceptor)
        [2011-12-13 01:54:59,391] INFO #### sendBufferSize : set[1048576] get[1048576] (kafka.network.Acceptor)
        . . .

        The code which prints the messages is from core/src/main/scala/kafka/network/SocketServer.scala:

        def accept(key: SelectionKey, processor: Processor) {
        val socketChannel = key.channel().asInstanceOf[ServerSocketChannel].accept()
        if(logger.isDebugEnabled)
        logger.info("Accepted connection from " + socketChannel.socket.getInetAddress() + " on " + socketChannel.socket.getLocalSocketAddress)
        socketChannel.configureBlocking(false)
        socketChannel.socket().setTcpNoDelay(true)
        socketChannel.socket().setSendBufferSize(sendBufferSize)
        socketChannel.socket().setReceiveBufferSize(receiveBufferSize)

        logger.info("#### sendBufferSize : set[" + sendBufferSize + "] get[" + socketChannel.socket().getSendBufferSize() + "]")

        2. ConsoleConsumer has some debug messages printing from SimpleConsumer at the receiving side:
        . . .
        [2011-12-13 01:58:15,099] DEBUG Connected to /172.17.166.122:10251 for fetching. (kafka.consumer.SimpleConsumer)
        [2011-12-13 01:58:15,160] TRACE requested receive buffer size=512000 actual receive buffer size= 512000 (kafka.consumer.SimpleConsumer)
        . . .

        Show
        John Fung added a comment - Hi Jay, The followings are the debug messages from the throughput test and the set values are effective: 1. Kafka log messages at the sending side: . . . [2011-12-13 01:54:57,572] INFO Closing socket connection to /172.17.166.122. (kafka.network.Processor) [2011-12-13 01:54:59,390] INFO #### sendBufferSize : set [1048576] get [1048576] (kafka.network.Acceptor) [2011-12-13 01:54:59,390] INFO #### sendBufferSize : set [1048576] get [1048576] (kafka.network.Acceptor) [2011-12-13 01:54:59,391] INFO #### sendBufferSize : set [1048576] get [1048576] (kafka.network.Acceptor) . . . The code which prints the messages is from core/src/main/scala/kafka/network/SocketServer.scala: def accept(key: SelectionKey, processor: Processor) { val socketChannel = key.channel().asInstanceOf [ServerSocketChannel] .accept() if(logger.isDebugEnabled) logger.info("Accepted connection from " + socketChannel.socket.getInetAddress() + " on " + socketChannel.socket.getLocalSocketAddress) socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setSendBufferSize(sendBufferSize) socketChannel.socket().setReceiveBufferSize(receiveBufferSize) logger.info("#### sendBufferSize : set [" + sendBufferSize + "] get [" + socketChannel.socket().getSendBufferSize() + "] ") 2. ConsoleConsumer has some debug messages printing from SimpleConsumer at the receiving side: . . . [2011-12-13 01:58:15,099] DEBUG Connected to /172.17.166.122:10251 for fetching. (kafka.consumer.SimpleConsumer) [2011-12-13 01:58:15,160] TRACE requested receive buffer size=512000 actual receive buffer size= 512000 (kafka.consumer.SimpleConsumer) . . .
        Hide
        Jay Kreps added a comment -

        But yes, the way TCP is supposed to work is that it takes the minimum of the size the server or client can support.

        Show
        Jay Kreps added a comment - But yes, the way TCP is supposed to work is that it takes the minimum of the size the server or client can support.
        Hide
        Jay Kreps added a comment -

        Interesting. Can we validate the setting directly by checking getReceiveBufferSize()?

        I think the reason it may be working is that linux defaults tcp window scaling on...I think the current approach would not work on Solaris because it defaults tcp window scaling off. Or maybe I am wrong. But I think we would be better off doing it the way they recommend and setting the buffer size in the way the documentation encourages.

        Show
        Jay Kreps added a comment - Interesting. Can we validate the setting directly by checking getReceiveBufferSize()? I think the reason it may be working is that linux defaults tcp window scaling on...I think the current approach would not work on Solaris because it defaults tcp window scaling off. Or maybe I am wrong. But I think we would be better off doing it the way they recommend and setting the buffer size in the way the documentation encourages.
        Hide
        John Fung added a comment -

        Hi Jay,

        Values greater than 64k is taking effect in a throughput test carried out recently. Please refer to the following comments for more details.

        ===================

        A throughput test had been carried out to observe the throughput gain by varying the socket buffer sizes at both the sending and receiving ends.

        The following is the layout of the components to test:

        Sending Colo Receiving Colo
        ------------------ L.A. -------------------- -------- Chicago -------
        Producer (500MB data) -> Source Kafka <------ ConsoleConsumer

        Environments
        ============

        • Sending Colo : L.A.
        • Receiving Colo : Chicago
        • Send Buffer Sizes for Source Kafka server : 100K, 500K, 1M
        • Receive Buffer Sizes for ConsoleConsumer : 100K, 500K, 1M
        • Hardware for both colos: CPU (2 physical, 16 virtual), 24GB RAM, Linux x86_64

        Testing Steps
        =============
        1. Add the property "socket.send.buffer=102400" (eg. 100K) in "kafka.properties" file for source kafka server
        2. Start Zookeeper and Kafka in sending colo
        3. Start ProducerPerformance class to produce 500MB of data to source Kafka server and wait until all data is produced.
        4. ConsoleConsumer is modified to timeout after 5 sec without incoming messages to get the total time for consuming 500MB of data.
        5. In receiving colo, specify the argument "--socket-buffer-size 102400" (100K in this case) for ConsoleConsumer
        6. Start ConsoleConsumer to connect to the zookeeper in sending colo and consume the data

        Results
        =======

        Kafka Cons Cons Cons
        Send Recv Fetch Time
        Buff Buff Size Taken Min
        ===== ===== ===== ===== =====
        100K 100K 1M 313 5.20
        100K 500K 1M 305 5.10
        100K 1M 1M 307 5.12
        500K 100K 1M 311 5.20
        500K 500K 1M 120 2.00
        500K 1M 1M 121 2.00
        1M 100K 1M 311 5.20
        1M 500K 1M 121 2.00
        1M 1M 1M 98 1.63

        1st col: Kafka Send Buffer Size - ("socket.send.buffer") configured in kafka.properties file
        2nd col: Consumer Receive Buffer Size "--socket-buffer-size" - which is a command line argument for ConsoleConsumer class
        3rd col: Consumer Fetch Size
        4th col: Seconds taken by ConsoleConsumer to consume all data
        5th col: Corresponding minutes taken

        Observations
        ============
        The results indicate that the overall throughput is related to the smaller socket buffer size at either the sending or the receiving end. In other words, the smaller socket buffer size acts as a "bottleneck" against the throughput of the pipeline.

        Show
        John Fung added a comment - Hi Jay, Values greater than 64k is taking effect in a throughput test carried out recently. Please refer to the following comments for more details. =================== A throughput test had been carried out to observe the throughput gain by varying the socket buffer sizes at both the sending and receiving ends. The following is the layout of the components to test: Sending Colo Receiving Colo ------------------ L.A. -------------------- -------- Chicago ------- Producer (500MB data) - > Source Kafka < ------ ConsoleConsumer Environments ============ Sending Colo : L.A. Receiving Colo : Chicago Send Buffer Sizes for Source Kafka server : 100K, 500K, 1M Receive Buffer Sizes for ConsoleConsumer : 100K, 500K, 1M Hardware for both colos: CPU (2 physical, 16 virtual), 24GB RAM, Linux x86_64 Testing Steps ============= 1. Add the property "socket.send.buffer=102400" (eg. 100K) in "kafka.properties" file for source kafka server 2. Start Zookeeper and Kafka in sending colo 3. Start ProducerPerformance class to produce 500MB of data to source Kafka server and wait until all data is produced. 4. ConsoleConsumer is modified to timeout after 5 sec without incoming messages to get the total time for consuming 500MB of data. 5. In receiving colo, specify the argument "--socket-buffer-size 102400" (100K in this case) for ConsoleConsumer 6. Start ConsoleConsumer to connect to the zookeeper in sending colo and consume the data Results ======= Kafka Cons Cons Cons Send Recv Fetch Time Buff Buff Size Taken Min ===== ===== ===== ===== ===== 100K 100K 1M 313 5.20 100K 500K 1M 305 5.10 100K 1M 1M 307 5.12 500K 100K 1M 311 5.20 500K 500K 1M 120 2.00 500K 1M 1M 121 2.00 1M 100K 1M 311 5.20 1M 500K 1M 121 2.00 1M 1M 1M 98 1.63 1st col: Kafka Send Buffer Size - ("socket.send.buffer") configured in kafka.properties file 2nd col: Consumer Receive Buffer Size "--socket-buffer-size" - which is a command line argument for ConsoleConsumer class 3rd col: Consumer Fetch Size 4th col: Seconds taken by ConsoleConsumer to consume all data 5th col: Corresponding minutes taken Observations ============ The results indicate that the overall throughput is related to the smaller socket buffer size at either the sending or the receiving end. In other words, the smaller socket buffer size acts as a "bottleneck" against the throughput of the pipeline.
        Hide
        Jay Kreps added a comment -

        Hey John, can you validate that this setting actually works for large values (i.e. the value you set takes effect)? Values larger than 64k require special negotiation at socket creation time, so that can be trickier to make happen. This setting seems to be made on sockets already accepted so i wonder if it would take effect since the tcp window has already been negotiated at that point. Having been bruised by this before it is best to be careful.

        Some docs on this are here: http://docs.oracle.com/javase/1.4.2/docs/api/java/net/Socket.html#setReceiveBufferSize%28int%29

        Key point is that to get the buffer setting to take effect on the sockets the server accepts you actually need to set this on the ServerSocket not the accepted sockets. I think setting it on the socket that is accepted won't have any effect. But don't trust anything I say, best to try it out and see.

        Show
        Jay Kreps added a comment - Hey John, can you validate that this setting actually works for large values (i.e. the value you set takes effect)? Values larger than 64k require special negotiation at socket creation time, so that can be trickier to make happen. This setting seems to be made on sockets already accepted so i wonder if it would take effect since the tcp window has already been negotiated at that point. Having been bruised by this before it is best to be careful. Some docs on this are here: http://docs.oracle.com/javase/1.4.2/docs/api/java/net/Socket.html#setReceiveBufferSize%28int%29 Key point is that to get the buffer setting to take effect on the sockets the server accepts you actually need to set this on the ServerSocket not the accepted sockets. I think setting it on the socket that is accepted won't have any effect. But don't trust anything I say, best to try it out and see.

          People

          • Assignee:
            Unassigned
            Reporter:
            John Fung
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development