Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2905

NetcatSource - Socket not closed when an exception is encountered during start() leading to file descriptor leaks

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.6.0
    • Fix Version/s: 1.8.0
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      During the flume agent start-up, the flume configuration containing the NetcatSource is parsed and the source's start() is called. If there is an issue while binding the channel's socket to a local address to configure the socket to listen for connections following exception is thrown but the socket open just before is not closed.

      2016-05-01 03:04:37,273 ERROR org.apache.flume.lifecycle.LifecycleSupervisor: Unable to start EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:src-1,state:IDLE} } - Exception follows.
      org.apache.flume.FlumeException: java.net.BindException: Address already in use
              at org.apache.flume.source.NetcatSource.start(NetcatSource.java:173)
              at org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)
              at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
              at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              at java.lang.Thread.run(Thread.java:745)
      Caused by: java.net.BindException: Address already in use
              at sun.nio.ch.Net.bind0(Native Method)
              at sun.nio.ch.Net.bind(Net.java:444)
              at sun.nio.ch.Net.bind(Net.java:436)
              at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
              at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
              at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
              at org.apache.flume.source.NetcatSource.start(NetcatSource.java:167)
              ... 9 more
      

      The source's start() is then called again leading to another socket being opened but not closed and so on. This leads to file descriptor (socket) leaks.

      This can be easily reproduced as follows:
      1. Set Netcat as the source in flume agent configuration.
      2. Set the bind port for the netcat source to a port which is already in use. e.g. in my case I used 50010 which is the port for DataNode's XCeiver Protocol in use by the HDFS service.
      3. Start flume agent and perform "lsof -p <flume_process_id> | wc -l". Notice the file descriptors keep on growing due to socket leaks with errors like: "can't identify protocol".

      1. FLUME-2905-6.patch
        4 kB
        Siddharth Ahuja
      2. FLUME-2905-5.patch
        4 kB
        Siddharth Ahuja
      3. FLUME-2905-4.patch
        4 kB
        Siddharth Ahuja
      4. FLUME-2905-3.patch
        4 kB
        Siddharth Ahuja
      5. FLUME-2905-2.patch
        4 kB
        Siddharth Ahuja
      6. FLUME-2905-1.patch
        4 kB
        Siddharth Ahuja
      7. FLUME-2905-0.patch
        4 kB
        Siddharth Ahuja

        Issue Links

          Activity

          Hide
          hudson Hudson added a comment -

          FAILURE: Integrated in Jenkins build Flume-trunk-hbase-1 #254 (See https://builds.apache.org/job/Flume-trunk-hbase-1/254/)
          FLUME-2905. Fix NetcatSource file descriptor leak if startup fails (denes: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=5e9cfef2b26f1960601d08d571e4c85c269503af)

          • (edit) flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java
          • (edit) flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
          Show
          hudson Hudson added a comment - FAILURE: Integrated in Jenkins build Flume-trunk-hbase-1 #254 (See https://builds.apache.org/job/Flume-trunk-hbase-1/254/ ) FLUME-2905 . Fix NetcatSource file descriptor leak if startup fails (denes: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=5e9cfef2b26f1960601d08d571e4c85c269503af ) (edit) flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java (edit) flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
          Hide
          denes Denes Arvay added a comment -

          Thank you Siddharth Ahuja for filing the ticket and providing the patch, thanks Attila Simon for the review.

          Show
          denes Denes Arvay added a comment - Thank you Siddharth Ahuja for filing the ticket and providing the patch, thanks Attila Simon for the review.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 5e9cfef2b26f1960601d08d571e4c85c269503af in flume's branch refs/heads/trunk from Siddharth Ahuja
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=5e9cfef ]

          FLUME-2905. Fix NetcatSource file descriptor leak if startup fails

          This patch fixes the issue in NetcatSource which occurs if there is a problem
          while binding the channel's socket to a local address and leads to a file descriptor
          (socket) leak.

          Reviewers: Attila Simon, Denes Arvay

          (Siddharth Ahuja via Denes Arvay)

          Show
          jira-bot ASF subversion and git services added a comment - Commit 5e9cfef2b26f1960601d08d571e4c85c269503af in flume's branch refs/heads/trunk from Siddharth Ahuja [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=5e9cfef ] FLUME-2905 . Fix NetcatSource file descriptor leak if startup fails This patch fixes the issue in NetcatSource which occurs if there is a problem while binding the channel's socket to a local address and leads to a file descriptor (socket) leak. Reviewers: Attila Simon, Denes Arvay (Siddharth Ahuja via Denes Arvay)
          Hide
          sahuja Siddharth Ahuja added a comment - - edited

          Hey Attila Simon, thanks for your reply and comments on the review. I have uploaded a new patch (patch #6) to this JIRA based on your comments. I hope this is enough to get the fix out of the way! Thanks in advance for reviewing

          Show
          sahuja Siddharth Ahuja added a comment - - edited Hey Attila Simon , thanks for your reply and comments on the review. I have uploaded a new patch (patch #6) to this JIRA based on your comments. I hope this is enough to get the fix out of the way! Thanks in advance for reviewing
          Hide
          sati Attila Simon added a comment -

          It looks good, thanks! You can check others eg: https://reviews.apache.org/r/48161/

          I also left some comments there (you can upload your corrections there, I guess attaching the final version of the patch here would be enough)

          Show
          sati Attila Simon added a comment - It looks good, thanks! You can check others eg: https://reviews.apache.org/r/48161/ I also left some comments there (you can upload your corrections there, I guess attaching the final version of the patch here would be enough)
          Hide
          sahuja Siddharth Ahuja added a comment -

          Hey Attila Simon, just added the review request on the Review Board. Not sure if I have done everything as per the process. Would be great if you could check. Thanks in advance!

          Show
          sahuja Siddharth Ahuja added a comment - Hey Attila Simon , just added the review request on the Review Board. Not sure if I have done everything as per the process. Would be great if you could check. Thanks in advance!
          Hide
          sahuja Siddharth Ahuja added a comment - - edited

          Hi Attila Simon, thanks a lot for your review.

          Please find my answers for your points:

          • For point 1. - "calling stop() after writing out the exception", I have moved stop() after logging the exception but just before it gets thrown.
          • For point 2. - We should possibly have a dedicated JIRA for removing the "return" statement from the stop() method as this would be a different issue to what I am trying to fix in this JIRA which is to prevent socket leaks if a port is already bound. Also, it would make tracking easier with a new JIRA as otherwise any issues (if any) arising from this removal will be discussed in this JIRA which is a side-track from the original issue that is already potentially resolved. What do you think?
          • For point 3 - I believe I have nothing to do here.

          I have gone on and created a new patch - FLUME-2905-5.patch for your review. I have tested that to ensure that there are no leaks and the junit also passes for me.

          I will try and add this to review board (haven't done that yet) soon.

          Thanks once again.

          Show
          sahuja Siddharth Ahuja added a comment - - edited Hi Attila Simon , thanks a lot for your review. Please find my answers for your points: For point 1. - "calling stop() after writing out the exception", I have moved stop() after logging the exception but just before it gets thrown. For point 2. - We should possibly have a dedicated JIRA for removing the "return" statement from the stop() method as this would be a different issue to what I am trying to fix in this JIRA which is to prevent socket leaks if a port is already bound. Also, it would make tracking easier with a new JIRA as otherwise any issues (if any) arising from this removal will be discussed in this JIRA which is a side-track from the original issue that is already potentially resolved. What do you think? For point 3 - I believe I have nothing to do here. I have gone on and created a new patch - FLUME-2905 -5.patch for your review. I have tested that to ensure that there are no leaks and the junit also passes for me. I will try and add this to review board (haven't done that yet) soon. Thanks once again.
          Hide
          sati Attila Simon added a comment -

          Hi Siddharth Ahuja,
          Junit passed for me but hasn't run exhaustively yet. I checked the FLUME-2905-4.patch and would have some minor suggestions. Would you mind uploading to https://reviews.apache.org/ for a code review? Please let me know if you have trouble with it (I'm also happy to upload it there for you).

          In a nutshell:

          • I would recommend calling stop() after writing out the exception.
          • I would recommend removing the return statement from that part of the stop function which catches an Exception from Socket close. That currently prevents the rest of the stop() to be executed including super.stop() which then can't set the Lifecycle state.
          • Test currently checks that stop was executed properly expecting the source to be in LifecycleState.STOP state. I think that is fine since it is internal to the NetcatSource that it opens a ServerSocket. What we have to check whether the Source is stopped on error. And this is indeed checked by the test.

          As a side note: It seems like that startSource() doesn't behave as expected (should pick a new port on error). This is a separate issue so I guess it should be fixed with a separate jira/PR.

          Show
          sati Attila Simon added a comment - Hi Siddharth Ahuja , Junit passed for me but hasn't run exhaustively yet. I checked the FLUME-2905 -4.patch and would have some minor suggestions. Would you mind uploading to https://reviews.apache.org/ for a code review? Please let me know if you have trouble with it (I'm also happy to upload it there for you). In a nutshell: I would recommend calling stop() after writing out the exception. I would recommend removing the return statement from that part of the stop function which catches an Exception from Socket close. That currently prevents the rest of the stop() to be executed including super.stop() which then can't set the Lifecycle state. Test currently checks that stop was executed properly expecting the source to be in LifecycleState.STOP state. I think that is fine since it is internal to the NetcatSource that it opens a ServerSocket. What we have to check whether the Source is stopped on error. And this is indeed checked by the test. As a side note: It seems like that startSource() doesn't behave as expected (should pick a new port on error). This is a separate issue so I guess it should be fixed with a separate jira/PR.
          Hide
          sahuja Siddharth Ahuja added a comment -

          Hi Denes Arvay,Jarek Jarcec Cecho, I have just updated the junit again with my latest patch, would have you time to review this for me please? Hopefully, the junits work this time around! Thank you in advance!

          Show
          sahuja Siddharth Ahuja added a comment - Hi Denes Arvay , Jarek Jarcec Cecho , I have just updated the junit again with my latest patch, would have you time to review this for me please? Hopefully, the junits work this time around! Thank you in advance!
          Hide
          sahuja Siddharth Ahuja added a comment -

          Hi Denes Arvay, I have just created another patch (patch #3) with a minor change to my junit tests. The Junits are passing in my local build, as such, it would be great if you could test them out again. Thanks in advance for your help!

          Show
          sahuja Siddharth Ahuja added a comment - Hi Denes Arvay , I have just created another patch (patch #3) with a minor change to my junit tests. The Junits are passing in my local build, as such, it would be great if you could test them out again. Thanks in advance for your help!
          Hide
          jarcec Jarek Jarcec Cecho added a comment -

          I'm still observing test failure:

          Results :
          
          Failed tests:   testSourceStoppedOnFlumeException(org.apache.flume.source.TestNetcatSource): Flume exception thrown as port already in use expected:<true> but was:<false>
          
          Show
          jarcec Jarek Jarcec Cecho added a comment - I'm still observing test failure: Results : Failed tests: testSourceStoppedOnFlumeException(org.apache.flume.source.TestNetcatSource): Flume exception thrown as port already in use expected:< true > but was:< false >
          Hide
          sahuja Siddharth Ahuja added a comment -

          I have attached a new patch : FLUME-2905-2 for the above, thanks again for the review.

          Show
          sahuja Siddharth Ahuja added a comment - I have attached a new patch : FLUME-2905 -2 for the above, thanks again for the review.
          Hide
          sahuja Siddharth Ahuja added a comment -

          Thanks Jarek Jarcec Cecho, I have found the issue here. It is to do with how I set up my source in TestNetcatSource.java. As I created a new NetcatSource in my test method and did not set up the channelProcessor for it manually it failed for you (but for some reason it worked fine for me in both my eclipse & command line environment). Regardless, I have modified the test so that it re-uses the existing source object with the channelProcessor already set up through the setup() method of the test suite. The point of the test is to ensure that when we start up a source that tries to bind on a port which is already bound we should stop the source. Creating a "new" source in the test suite is not important and we can re-use the already existing source.

          Show
          sahuja Siddharth Ahuja added a comment - Thanks Jarek Jarcec Cecho , I have found the issue here. It is to do with how I set up my source in TestNetcatSource.java. As I created a new NetcatSource in my test method and did not set up the channelProcessor for it manually it failed for you (but for some reason it worked fine for me in both my eclipse & command line environment). Regardless, I have modified the test so that it re-uses the existing source object with the channelProcessor already set up through the setup() method of the test suite. The point of the test is to ensure that when we start up a source that tries to bind on a port which is already bound we should stop the source. Creating a "new" source in the test suite is not important and we can re-use the already existing source.
          Hide
          jarcec Jarek Jarcec Cecho added a comment -

          My local build is failing with:

          Tests run: 8, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 6.481 sec <<< FAILURE!
          testSourceStoppedOnFlumeException(org.apache.flume.source.TestNetcatSource)  Time elapsed: 6 sec  <<< ERROR!
          java.lang.IllegalStateException: No channel processor configured
          	at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
          	at org.apache.flume.source.AbstractSource.start(AbstractSource.java:45)
          	at org.apache.flume.source.NetcatSource.start(NetcatSource.java:192)
          	at org.apache.flume.source.TestNetcatSource.startSource(TestNetcatSource.java:344)
          	at org.apache.flume.source.TestNetcatSource.testSourceStoppedOnFlumeException(TestNetcatSource.java:314)
          	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          	at java.lang.reflect.Method.invoke(Method.java:498)
          	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
          	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
          	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
          	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
          	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
          	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
          	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
          	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
          	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
          	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
          	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
          	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
          	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
          	at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
          	at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
          	at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
          	at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
          	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          	at java.lang.reflect.Method.invoke(Method.java:498)
          	at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
          	at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
          	at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
          	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
          	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
          

          Wondering if you saw that in your environment as well?

          Show
          jarcec Jarek Jarcec Cecho added a comment - My local build is failing with: Tests run: 8, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 6.481 sec <<< FAILURE! testSourceStoppedOnFlumeException(org.apache.flume.source.TestNetcatSource) Time elapsed: 6 sec <<< ERROR! java.lang.IllegalStateException: No channel processor configured at com.google.common.base.Preconditions.checkState(Preconditions.java:145) at org.apache.flume.source.AbstractSource.start(AbstractSource.java:45) at org.apache.flume.source.NetcatSource.start(NetcatSource.java:192) at org.apache.flume.source.TestNetcatSource.startSource(TestNetcatSource.java:344) at org.apache.flume.source.TestNetcatSource.testSourceStoppedOnFlumeException(TestNetcatSource.java:314) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222) at org.junit.runners.ParentRunner.run(ParentRunner.java:300) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189) at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165) at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75) Wondering if you saw that in your environment as well?
          Hide
          sahuja Siddharth Ahuja added a comment -

          Thanks Jarek Jarcec Cecho, attached a new patch, hopefully this should be enough

          Show
          sahuja Siddharth Ahuja added a comment - Thanks Jarek Jarcec Cecho , attached a new patch, hopefully this should be enough
          Hide
          jarcec Jarek Jarcec Cecho added a comment -

          Sure, my pleasure. Quickly looking at the patch, can you please remove the trailing white space warnings Siddharth Ahuja?

          jarcec@arlene flume % git apply FLUME-2905-0.patch                                                                                                                                                                                                                                                                         [trunk ✗] (1.8.0_77-b03) (ruby-2.2.3) [12:54:00]
          FLUME-2905-0.patch:25: space before tab in indent.
              	.setNameFormat("netcat-handler-%d").build());
          FLUME-2905-0.patch:50: trailing whitespace.
          
          FLUME-2905-0.patch:52: trailing whitespace.
             * Tests that the source is stopped when an exception is thrown
          FLUME-2905-0.patch:54: trailing whitespace.
             * clean up the sockets opened during source.start().
          FLUME-2905-0.patch:55: trailing whitespace.
             *
          warning: squelched 5 whitespace errors
          warning: 10 lines add whitespace errors.
          
          Show
          jarcec Jarek Jarcec Cecho added a comment - Sure, my pleasure. Quickly looking at the patch, can you please remove the trailing white space warnings Siddharth Ahuja ? jarcec@arlene flume % git apply FLUME-2905-0.patch [trunk ✗] (1.8.0_77-b03) (ruby-2.2.3) [12:54:00] FLUME-2905-0.patch:25: space before tab in indent. .setNameFormat( "netcat-handler-%d" ).build()); FLUME-2905-0.patch:50: trailing whitespace. FLUME-2905-0.patch:52: trailing whitespace. * Tests that the source is stopped when an exception is thrown FLUME-2905-0.patch:54: trailing whitespace. * clean up the sockets opened during source.start(). FLUME-2905-0.patch:55: trailing whitespace. * warning: squelched 5 whitespace errors warning: 10 lines add whitespace errors.
          Hide
          granthenke Grant Henke added a comment -

          Jarek Jarcec Cecho Would you have time to review this?

          Show
          granthenke Grant Henke added a comment - Jarek Jarcec Cecho Would you have time to review this?
          Hide
          sahuja Siddharth Ahuja added a comment -

          Attached the patch that stops the source and cleans up the socket when a BindException is encountered if a port is already in use by invoking stop() .
          Also, moved the creation of the thread pool near to where it gets used so that it does not have to be cleaned up during the stop().
          JUnit is also provided.

          Tested the patch as follows:
          • Started HDFS (datanode) service that binds to the port 50010, so port is in use.
          • Started updated flume-agent with configuration containing Netcat source and File roll sink with bind port as 50010.
          • Investigated flume logs and found the BindException due to port already in use.
          • Ran "ps auxx|grep -i flume" to get the flume process id.
          • Ran "lsof -p <flume_proc_id> | wc -l" multiple times to check if file descriptors are increasing. They are stable.
          • Stopped HDFS service to free up port 50010.
          • Noticed from flume-agent logs that source is finally started successfully with socket bound to 50010.
          • From a new terminal, ran "nc localhost 50010" and entered some text.
          • The text is written on the local filesystem successfully.

          Show
          sahuja Siddharth Ahuja added a comment - Attached the patch that stops the source and cleans up the socket when a BindException is encountered if a port is already in use by invoking stop() . Also, moved the creation of the thread pool near to where it gets used so that it does not have to be cleaned up during the stop(). JUnit is also provided. Tested the patch as follows: • Started HDFS (datanode) service that binds to the port 50010, so port is in use. • Started updated flume-agent with configuration containing Netcat source and File roll sink with bind port as 50010. • Investigated flume logs and found the BindException due to port already in use. • Ran "ps auxx|grep -i flume" to get the flume process id. • Ran "lsof -p <flume_proc_id> | wc -l" multiple times to check if file descriptors are increasing. They are stable. • Stopped HDFS service to free up port 50010. • Noticed from flume-agent logs that source is finally started successfully with socket bound to 50010. • From a new terminal, ran "nc localhost 50010" and entered some text. • The text is written on the local filesystem successfully.
          Hide
          sahuja Siddharth Ahuja added a comment -

          Trying to get it assigned to myself...

          Show
          sahuja Siddharth Ahuja added a comment - Trying to get it assigned to myself...

            People

            • Assignee:
              sahuja Siddharth Ahuja
              Reporter:
              sahuja Siddharth Ahuja
            • Votes:
              1 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development