Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
1.4.0
-
None
-
None
-
Flume 1.4.0
Java 1.7.0_10
Description
When LoadBalancingLog4JAppender loses its connection to Flume, upon subsequent attempts to log, it will spawn NIO threads and leave them open. This is coming from the NettyAcroRPCClient's attempt to instantiate a NettyTransciever. When the Transciever fails to connect in the constructor, an exception is thrown, but the NIO threads that were spawned by the NioClientSocketChannelFactory in the Boss and Worker pools are never closed. This causes more threads to be started for each attempt at sending a log.
The code in question from NettyAvroRPCClient.java:
transceiver = new NettyTransceiver(this.address, new NioClientSocketChannelFactory( Executors.newCachedThreadPool(new TransceiverThreadFactory( "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")), Executors.newCachedThreadPool(new TransceiverThreadFactory( "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))), tu.toMillis(timeout));
Because all of the parameters are instantiated inline, there is no reference from which to clean them up if the Constructor throws an exception.
I am proposing we explicitly instantiate these variables so that, if the constructor of NettyTransciever throws an exception and returns null, then we can clean up the threads that were spawned.
something along the lines of:
callTimeoutPool = Executors.newCachedThreadPool( new TransceiverThreadFactory("Flume Avro RPC Client Call Invoker")); NioClientSocketChannelFactory factory;// move declaration out so that it is accessible in catch try { factory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(new TransceiverThreadFactory( "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")), Executors.newCachedThreadPool(new TransceiverThreadFactory( "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))); //move instantiation out of constructor signature so taht it is accessible afer constructor call transceiver = new NettyTransceiver(this.address, factory, tu.toMillis(timeout)); avroClient = SpecificRequestor.getClient(AvroSourceProtocol.Callback.class, transceiver); } catch (IOException ex) { /*Graceful shutdown (from http://docs.jboss.org/netty/3.2/api/org/jboss/netty/channel/ChannelFactory.html) To shut down a network application service which is managed by a factory. you should follow the following steps: close all channels created by the factory and their child channels usually using ChannelGroup.close(), and call releaseExternalResources().*/ factory.releaseExternalResources();// throw new FlumeException(this + ": RPC connection error", ex); } setState(ConnState.READY); }