Uploaded image for project: 'ZooKeeper'
  1. ZooKeeper
  2. ZOOKEEPER-4424

Re-throwing IOException in Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler#acceptConnections is not always needed

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.6.2
    • None
    • server

    Description

      When `Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler` is accepting a new socket connection, it may throw an IOException at line 510 or 514 or 515 or 517.

      The scenario of IOException at line 510 is discussed in ZOOKEEPER-4203 and https://github.com/apache/zookeeper/pull/1596 . It triggers a concurrency bug. However, If the IOException occurs at line 514 or 515 or 517, actually we can avoid this complicated process. We can simply catch the IOException and proceed to accept the next socket connection, without exiting the LearnerCnxAcceptorHandler thread. The exceptions in those operations (e.g., `setSoTimeout`) only indicates that the socket connection has some issues, but the `ServerSocket` can still work well. Therefore, the `LearnerCnxAcceptorHandler` can proceed to accept more socket connections with this `ServerSocket`.

      //zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
                  private void acceptConnections() throws IOException {
                      Socket socket = null;
                      boolean error = false;
                      try {
                          socket = serverSocket.accept();  // line 510
      
      
                          // start with the initLimit, once the ack is processed
                          // in LearnerHandler switch to the syncLimit
                          socket.setSoTimeout(self.tickTime * self.initLimit); // line 514
                          socket.setTcpNoDelay(nodelay);  // line 515
      
                          BufferedInputStream is = new BufferedInputStream(socket.getInputStream());  // line 517
                          LearnerHandler fh = new LearnerHandler(socket, is, Leader.this);
                          fh.start();
                      } catch (SocketException e) {
                          error = true;
                          if (stop.get()) {
                              LOG.warn("Exception while shutting down acceptor.", e);
                          } else {
                              throw e;
                          }
                      } catch (SaslException e) {
                          LOG.error("Exception while connecting to quorum learner", e);
                          error = true;
                      } catch (Exception e) {
                          error = true;
                          throw e;
                      } finally {
                          // Don't leak sockets on errors
                          if (error && socket != null && !socket.isClosed()) {
                              try {
                                  socket.close();
                              } catch (IOException e) {
                                  LOG.warn("Error closing socket: " + socket, e);
                              }
                          }
                      }
                  }
      

      We propose that the following implementation is better. The advantage is that those IOException in the socket will not force the `Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler` to exit, and thus avoid the overhead of re-election and potential concurrency bugs such as ZOOKEEPER-4203.

                  private void acceptConnections() throws IOException {
                      Socket socket = null;
                      boolean error = false;
                      try {
                          socket = serverSocket.accept();
                          BufferedInputStream is;
      
                          try {
                              // start with the initLimit, once the ack is processed
                              // in LearnerHandler switch to the syncLimit
                              socket.setSoTimeout(self.tickTime * self.initLimit);
                              socket.setTcpNoDelay(nodelay);
      
                              is = new BufferedInputStream(socket.getInputStream());
                          } catch (IOException e) {
                              error = true;
                              return;  // close the socket at the finally block
                          }
      
                          LearnerHandler fh = new LearnerHandler(socket, is, Leader.this);
                          fh.start();
                      } catch (SocketException e) {
                          error = true;
                          if (stop.get()) {
                              LOG.warn("Exception while shutting down acceptor.", e);
                          } else {
                              throw e;
                          }
                      } catch (SaslException e) {
                          LOG.error("Exception while connecting to quorum learner", e);
                      } catch (Exception e) {
                          error = true;
                          throw e;
                      } finally {
                          // Don't leak sockets on errors
                          if (error && socket != null && !socket.isClosed()) {
                              try {
                                  socket.close();
                              } catch (IOException e) {
                                  LOG.warn("Error closing socket: " + socket, e);
                              }
                          }
                      }
                  }
      

      This code pattern has been adopted by other communities. For example, in Kafka https://github.com/apache/kafka/blob/2cd96f0e64f8a4f4b74e8049a6c527a990cb4777/core/src/main/scala/kafka/network/SocketServer.scala#L714-L740 :

        /**
         * Accept a new connection
         */
        private def accept(key: SelectionKey): Option[SocketChannel] = {
          val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
          val socketChannel = serverSocketChannel.accept()
          try {
            connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
            configureAcceptedSocketChannel(socketChannel)
            Some(socketChannel)
          } catch {
            case e: TooManyConnectionsException =>
              info(...)
              close(endPoint.listenerName, socketChannel)
              None
            case e: ConnectionThrottledException =>
              // ...
              None
            case e: IOException =>
              error(...)
              close(endPoint.listenerName, socketChannel)
              None
          }
        }
      
        /**
         * Close `channel` and decrement the connection count.
         */
        def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
          if (channel != null) {
            // ...
            closeSocket(channel)
          }
        }
      
        protected def closeSocket(channel: SocketChannel): Unit = {
          CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
          CoreUtils.swallow(channel.close(), this, Level.ERROR)
        } 

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              functioner Haoze Wu
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m