Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13457

SocketChannel in Acceptor#accept is not closed upon IOException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 2.8.0
    • 3.2.0
    • network
    • None

    Description

      When the kafka.network.Acceptor in SocketServer.scala accepts a new connection in the `accept` function, it handles the `TooManyConnectionsException` and `ConnectionThrottledException`. However, the socketChannel operations (line 720 or 721 or 722) within the try block may potentially throw an IOException as well, which is not handled.

       

      //core/src/main/scala/kafka/network/SocketServer.scala
      // Acceptor class
        private def accept(key: SelectionKey): Option[SocketChannel] = {
          val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
          val socketChannel = serverSocketChannel.accept()     // line 717
          try {
            connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
            socketChannel.configureBlocking(false)             // line 720
            socketChannel.socket().setTcpNoDelay(true)         // line 721
            socketChannel.socket().setKeepAlive(true)          // line 722
            if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
              socketChannel.socket().setSendBufferSize(sendBufferSize)
            Some(socketChannel)
          } catch {
            case e: TooManyConnectionsException =>       
              info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
              close(endPoint.listenerName, socketChannel)
              None
            case e: ConnectionThrottledException => 
              val ip = socketChannel.socket.getInetAddress
              debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} ms")
              val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
              throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
              None
          }
        }
      

      This thrown IOException is caught in the caller `acceptNewConnections` in line 706, which only prints an error message. The socketChannel that throws this IOException is not closed.

       

      //core/src/main/scala/kafka/network/SocketServer.scala
        private def acceptNewConnections(): Unit = {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()          if (key.isAcceptable) {
                  accept(key).foreach { socketChannel => 
                      ...
                    } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                  }
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
              } catch {
                case e: Throwable => error("Error while accepting connection", e)   // line 706
              }
            }
          }
        }
      

      We found during testing this would cause our Kafka clients to experience errors (InvalidReplicationFactorException) for 40+ seconds when creating new topics. After 40 seconds, the clients would be able to create new topics successfully.

      We check that after adding the socketChannel.close() upon IOException, the symptoms will disappear, so the clients do not need to wait for 40s to be working again.

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              functioner Haoze Wu
              David Jacot David Jacot
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: