Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2752

Flume AvroSource will leak the memory and the OOM will be happened.

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.6.0
    • Fix Version/s: 1.8.0
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      If the flume agent config the nonexist IP for the avro source,the exception will be happened as follow:
      2015-07-21 19:57:47,054 | ERROR | [lifecycleSupervisor-1-2] | Unable to start EventDrivenSourceRunner: { source:Avro source avro_source_21155:

      { bindAddress: 51.196.27.32, port: 21155 }

      } - Exception follows. | org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)
      org.jboss.netty.channel.ChannelException: Failed to bind to: /51.196.27.32:21155
      at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:297)
      at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)
      at org.apache.flume.source.AvroSource.start(AvroSource.java:294)
      at org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)
      at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.net.BindException: Cannot assign requested address
      at sun.nio.ch.Net.bind0(Native Method)
      at sun.nio.ch.Net.bind(Net.java:437)
      at sun.nio.ch.Net.bind(Net.java:429)
      at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
      at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
      at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.bind(NioServerSocketPipelineSink.java:140)
      at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleServerSocket(NioServerSocketPipelineSink.java:90)
      at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:64)
      at org.jboss.netty.channel.Channels.bind(Channels.java:569)
      at org.jboss.netty.channel.AbstractChannel.bind(AbstractChannel.java:189)
      at org.jboss.netty.bootstrap.ServerBootstrap$Binder.channelOpen(ServerBootstrap.java:342)
      at org.jboss.netty.channel.Channels.fireChannelOpen(Channels.java:170)
      at org.jboss.netty.channel.socket.nio.NioServerSocketChannel.<init>(NioServerSocketChannel.java:80)
      at org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory.newChannel(NioServerSocketChannelFactory.java:158)
      at org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory.newChannel(NioServerSocketChannelFactory.java:86)
      at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:276)

      if the above exception happened for 2 hours,and the agent JVM -Xxx is 4G,the OutOfMemory will be happened.

        Issue Links

          Activity

          Hide
          hudson Hudson added a comment -

          FAILURE: Integrated in Jenkins build Flume-trunk-hbase-1 #253 (See https://builds.apache.org/job/Flume-trunk-hbase-1/253/)
          FLUME-2752. Fix AvroSource startup resource leaks (denes: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=b5e5ba50f4333272b9e2f2be2b32027e667f32e2)

          • (edit) flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
          • (edit) flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
          Show
          hudson Hudson added a comment - FAILURE: Integrated in Jenkins build Flume-trunk-hbase-1 #253 (See https://builds.apache.org/job/Flume-trunk-hbase-1/253/ ) FLUME-2752 . Fix AvroSource startup resource leaks (denes: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=b5e5ba50f4333272b9e2f2be2b32027e667f32e2 ) (edit) flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java (edit) flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
          Hide
          denes Denes Arvay added a comment -

          Thank you Attila Simon for the patch, I have committed it.

          Show
          denes Denes Arvay added a comment - Thank you Attila Simon for the patch, I have committed it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flume/pull/141

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flume/pull/141
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit b5e5ba50f4333272b9e2f2be2b32027e667f32e2 in flume's branch refs/heads/trunk from Attila Simon
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=b5e5ba5 ]

          FLUME-2752. Fix AvroSource startup resource leaks

          Cleanup after Netty initialisation fails (call this.stop())

          • Make sure this.stop() releases the resources and end up the component in
            a LifecycleAware.STOPPED state
          • Added junit test to cover the invalid host scenario
          • Added junit test to cover the used port scenario

          This closes #141.

          Reviewers: Denes Arvay

          (Attila Simon via Denes Arvay)

          Show
          jira-bot ASF subversion and git services added a comment - Commit b5e5ba50f4333272b9e2f2be2b32027e667f32e2 in flume's branch refs/heads/trunk from Attila Simon [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=b5e5ba5 ] FLUME-2752 . Fix AvroSource startup resource leaks Cleanup after Netty initialisation fails (call this.stop()) Make sure this.stop() releases the resources and end up the component in a LifecycleAware.STOPPED state Added junit test to cover the invalid host scenario Added junit test to cover the used port scenario This closes #141. Reviewers: Denes Arvay (Attila Simon via Denes Arvay)
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user simonati opened a pull request:

          https://github.com/apache/flume/pull/141

          FLUME-2752 Fix AvroSource startup resource leaks

          Cleanup after Netty initialisation failes (call this.stop())
          Make sure this.stop() releases the resources and end up the component in
          a LifecycleAware.STOPPED state
          Added junit tests to cover the invalid host and used port scenarios

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/simonati/flume FLUME-2752

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flume/pull/141.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #141


          commit b22796fab4a9abd9edb85ae0d966ffac1785a1e5
          Author: Attila Simon <sati@cloudera.com>
          Date: 2017-06-29T06:21:44Z

          FLUME-2752 Fix AvroSource startup resource leaks

          Cleanup after Netty initialisation failes (call this.stop())
          Make sure this.stop() releases the resources and end up the component in
          a LifecycleAware.STOPPED state
          Added junit test to cover the invalid host scenario
          Added junit test to cover the used port scenario

          Change-Id: Ia6309b597166fc4061864805fc4211c57f01c58e

          commit 4b479013e21f77ef68da2a4f71462a7fda4ba932
          Author: Attila Simon <sati@cloudera.com>
          Date: 2017-06-29T07:13:15Z

          Update rest of junit to pass

          Change-Id: If9e4bf672359e7d5817cfc70438004b4358c9d10

          commit b9ba593299ed87a0fe0db0183398a00409311546
          Author: Attila Simon <sati@cloudera.com>
          Date: 2017-06-29T07:20:40Z

          fix indention

          Change-Id: I37242fbb89f85ec8c31016d26c939bff82aee8f1


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user simonati opened a pull request: https://github.com/apache/flume/pull/141 FLUME-2752 Fix AvroSource startup resource leaks Cleanup after Netty initialisation failes (call this.stop()) Make sure this.stop() releases the resources and end up the component in a LifecycleAware.STOPPED state Added junit tests to cover the invalid host and used port scenarios You can merge this pull request into a Git repository by running: $ git pull https://github.com/simonati/flume FLUME-2752 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flume/pull/141.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #141 commit b22796fab4a9abd9edb85ae0d966ffac1785a1e5 Author: Attila Simon <sati@cloudera.com> Date: 2017-06-29T06:21:44Z FLUME-2752 Fix AvroSource startup resource leaks Cleanup after Netty initialisation failes (call this.stop()) Make sure this.stop() releases the resources and end up the component in a LifecycleAware.STOPPED state Added junit test to cover the invalid host scenario Added junit test to cover the used port scenario Change-Id: Ia6309b597166fc4061864805fc4211c57f01c58e commit 4b479013e21f77ef68da2a4f71462a7fda4ba932 Author: Attila Simon <sati@cloudera.com> Date: 2017-06-29T07:13:15Z Update rest of junit to pass Change-Id: If9e4bf672359e7d5817cfc70438004b4358c9d10 commit b9ba593299ed87a0fe0db0183398a00409311546 Author: Attila Simon <sati@cloudera.com> Date: 2017-06-29T07:20:40Z fix indention Change-Id: I37242fbb89f85ec8c31016d26c939bff82aee8f1
          Hide
          sati Attila Simon added a comment -

          I experienced similar issues on startup. If there is any exception during startup then the resources won't be released (memory, threadpool, open files, etc). I have an implementation which conforms with the proposed idea here: call this.stop() if there were any error during startup and make sure that this.stop() releases the resources. I plan to open a pull request for this change soon.

          Show
          sati Attila Simon added a comment - I experienced similar issues on startup. If there is any exception during startup then the resources won't be released (memory, threadpool, open files, etc). I have an implementation which conforms with the proposed idea here: call this.stop() if there were any error during startup and make sure that this.stop() releases the resources. I plan to open a pull request for this change soon.
          Hide
          yinghua_zh yinghua_zh added a comment -

          if the nettyserver start failed,we should invoke the close methed,the close method will release the resource:

          try

          { server.start(); }

          catch (Exception e1)

          { logger.error("start sever failed:", e1); server.close(); throw new RuntimeException(e1); }
          Show
          yinghua_zh yinghua_zh added a comment - if the nettyserver start failed,we should invoke the close methed,the close method will release the resource: try { server.start(); } catch (Exception e1) { logger.error("start sever failed:", e1); server.close(); throw new RuntimeException(e1); }
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Good catch! We should make sure we do shutdown the thread pool if the start fails.

          Show
          hshreedharan Hari Shreedharan added a comment - Good catch! We should make sure we do shutdown the thread pool if the start fails.
          Hide
          yinghua_zh yinghua_zh added a comment - - edited

          @Override
          public void start() {
          logger.info("Starting {}...", this);

          Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
          NioServerSocketChannelFactory socketChannelFactory = initSocketChannelFactory();
          ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory();

          connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
          server.start();
          sourceCounter.start();
          super.start();
          final NettyServer srv = (NettyServer)server;
          connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){

          @Override
          public void run()

          { sourceCounter.setOpenConnectionCount( Long.valueOf(srv.getNumActiveConnections())); }
          }, 0, 60, TimeUnit.SECONDS);

          logger.info("Avro source {} started.", getName());
          }
          I have modfye the code ,can you agree?

          @Override
          public void start() {
          logger.info("Starting {}...", this);


          Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
          NioServerSocketChannelFactory socketChannelFactory = initSocketChannelFactory();
          ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory();
          try
          { server = new NettyServer(responder, new InetSocketAddress(bindAddress, port), socketChannelFactory, pipelineFactory, null); }catch (Exception ex){
          try{ socketChannelFactory.releaseExternalResources(); }
          catch (Exception e1){ logger.error("realese ectrrnla resource occur exception:", e1); }

          try{ socketChannelFactory.releaseExternalResources(); } catch (Exception e2){ logger.error("realese ectrrnla resource occur exception:", e2); }
          throw new RuntimeException(ex);
          }



          connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
          server.start();
          sourceCounter.start();
          super.start();
          final NettyServer srv = (NettyServer)server;
          connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){

          @Override
          public void run() { sourceCounter.setOpenConnectionCount( Long.valueOf(srv.getNumActiveConnections())); }

          }, 0, 60, TimeUnit.SECONDS);

          logger.info("Avro source {} started.", getName());
          }

          Show
          yinghua_zh yinghua_zh added a comment - - edited @Override public void start() { logger.info("Starting {}...", this); Responder responder = new SpecificResponder(AvroSourceProtocol.class, this); NioServerSocketChannelFactory socketChannelFactory = initSocketChannelFactory(); ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory(); connectionCountUpdater = Executors.newSingleThreadScheduledExecutor(); server.start(); sourceCounter.start(); super.start(); final NettyServer srv = (NettyServer)server; connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){ @Override public void run() { sourceCounter.setOpenConnectionCount( Long.valueOf(srv.getNumActiveConnections())); } }, 0, 60, TimeUnit.SECONDS); logger.info("Avro source {} started.", getName()); } I have modfye the code ,can you agree? @Override public void start() { logger.info("Starting {}...", this); Responder responder = new SpecificResponder(AvroSourceProtocol.class, this); NioServerSocketChannelFactory socketChannelFactory = initSocketChannelFactory(); ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory(); try { server = new NettyServer(responder, new InetSocketAddress(bindAddress, port), socketChannelFactory, pipelineFactory, null); }catch (Exception ex){ try{ socketChannelFactory.releaseExternalResources(); } catch (Exception e1){ logger.error("realese ectrrnla resource occur exception:", e1); } try{ socketChannelFactory.releaseExternalResources(); } catch (Exception e2){ logger.error("realese ectrrnla resource occur exception:", e2); } throw new RuntimeException(ex); } connectionCountUpdater = Executors.newSingleThreadScheduledExecutor(); server.start(); sourceCounter.start(); super.start(); final NettyServer srv = (NettyServer)server; connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){ @Override public void run() { sourceCounter.setOpenConnectionCount( Long.valueOf(srv.getNumActiveConnections())); } }, 0, 60, TimeUnit.SECONDS); logger.info("Avro source {} started.", getName()); }
          Hide
          yinghua_zh yinghua_zh added a comment - - edited

          The avro soucr start method will start thread pool,but the NettyServer init failed,and the thread pool does not shutdown,Then the memory will be leaked,and the next 3 seconds will start avso source and it will start thread pool,and the NettyServer init failed,and the thread pool does not shutdown,So the OutOfMemory will be happened finally.

          Show
          yinghua_zh yinghua_zh added a comment - - edited The avro soucr start method will start thread pool,but the NettyServer init failed,and the thread pool does not shutdown,Then the memory will be leaked,and the next 3 seconds will start avso source and it will start thread pool,and the NettyServer init failed,and the thread pool does not shutdown,So the OutOfMemory will be happened finally.

            People

            • Assignee:
              sati Attila Simon
              Reporter:
              yinghua_zh yinghua_zh
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development