Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2221

NIO Worker threads remain open when connection fails

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 1.4.0
    • None
    • Client SDK
    • None
    • Flume 1.4.0
      Java 1.7.0_10

    Description

      When LoadBalancingLog4JAppender loses its connection to Flume, upon subsequent attempts to log, it will spawn NIO threads and leave them open. This is coming from the NettyAcroRPCClient's attempt to instantiate a NettyTransciever. When the Transciever fails to connect in the constructor, an exception is thrown, but the NIO threads that were spawned by the NioClientSocketChannelFactory in the Boss and Worker pools are never closed. This causes more threads to be started for each attempt at sending a log.

      The code in question from NettyAvroRPCClient.java:

      transceiver = new NettyTransceiver(this.address,
                new NioClientSocketChannelFactory(
              Executors.newCachedThreadPool(new TransceiverThreadFactory(
                  "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
              Executors.newCachedThreadPool(new TransceiverThreadFactory(
                  "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))),
                tu.toMillis(timeout));
      

      Because all of the parameters are instantiated inline, there is no reference from which to clean them up if the Constructor throws an exception.

      I am proposing we explicitly instantiate these variables so that, if the constructor of NettyTransciever throws an exception and returns null, then we can clean up the threads that were spawned.

      something along the lines of:

        callTimeoutPool = Executors.newCachedThreadPool(
              new TransceiverThreadFactory("Flume Avro RPC Client Call Invoker"));
          NioClientSocketChannelFactory factory;// move declaration out so that it is accessible in catch
          try {
            factory = new NioClientSocketChannelFactory(
              Executors.newCachedThreadPool(new TransceiverThreadFactory(
                  "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
              Executors.newCachedThreadPool(new TransceiverThreadFactory(
                  "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")));  //move instantiation out of constructor signature so taht it is accessible afer constructor call
            transceiver = new NettyTransceiver(this.address, factory, tu.toMillis(timeout));
            avroClient =
                SpecificRequestor.getClient(AvroSourceProtocol.Callback.class,
                transceiver);
          } catch (IOException ex) {
            /*Graceful shutdown (from http://docs.jboss.org/netty/3.2/api/org/jboss/netty/channel/ChannelFactory.html)
      
              To shut down a network application service which is managed by a factory. you should follow the following steps:
              close all channels created by the factory and their child channels usually using ChannelGroup.close(), and
              call releaseExternalResources().*/
      
            factory.releaseExternalResources();//
            throw new FlumeException(this + ": RPC connection error", ex);
          }
          setState(ConnState.READY);
        }
      
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            mfish04 matt fisher
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: