Flume
  1. Flume
  2. FLUME-1425

Create a SpoolDirectory Source and Client

    Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: v1.3.0
    • Component/s: None
    • Labels:
      None

      Description

      The proposal is to create a small executable client which reads logs from a spooling directory and sends them to a flume sink, then performs cleanup on the directory (either by deleting or moving the logs). It would make the following assumptions

      • Files placed in the directory are uniquely named
      • Files placed in the directory are immutable

      The problem this is trying to solve is that there is currently no way to do guaranteed event delivery across flume agent restarts when the data is being collected through an asynchronous source (and not directly from the client API). Say, for instance, you are using a exec("tail -F") source. If the agent restarts due to error or intentionally, tail may pick up at a new location and you lose the intermediate data.

      At the same time, there are users who want at-least-once semantics, and expect those to apply as soon as the data is written to disk from the initial logger process (e.g. apache logs), not just once it has reached a flume agent. This idea would bridge that gap, assuming the user is able to copy immutable logs to a spooling directory through a cron script or something.

      The basic internal logic of such a client would be as follows:

      • Scan the directory for files
      • Chose a file and read through, while sending events to an agent
      • Close the file and delete it (or rename, or otherwise mark completed)

      That's about it. We could add sync-points to make recovery more efficient in the case of failure.

      A key question is whether this should be implemented as a standalone client or as a source. My instinct is actually to do this as a source, but there could be some benefit to not requiring an entire agent in order to run this, specifically that it would become platform independent and you could stick it on Windows machines. Others I have talked to have also sided on a standalone executable.

      1. FileProcessingSource.java
        10 kB
        Aaron Baff
      2. FLUME-1425.avro-conf-file.txt
        0.6 kB
        NO NAME
      3. FLUME-1425.patch.v1.txt
        33 kB
        NO NAME
      4. FLUME-1425.v5.patch.txt
        60 kB
        NO NAME
      5. FLUME-1425.v6.patch.txt
        63 kB
        NO NAME
      6. FLUME-1425.v6.patch.txt
        78 kB
        NO NAME
      7. FLUME-1425.v7.patch.txt
        67 kB
        NO NAME
      8. FLUME-1425.v8.patch.txt
        70 kB
        NO NAME
      9. FLUME-1425.v9.patch.txt
        70 kB
        NO NAME

        Issue Links

          Activity

          Hide
          Alexander Alten-Lorenz added a comment -

          See, is available in 1.3.0. Sry for confusion.

          Show
          Alexander Alten-Lorenz added a comment - See, is available in 1.3.0. Sry for confusion.
          Hide
          Alexander Alten-Lorenz added a comment -

          Is this committed in TRUNK or 1.3.0-rc?

          Show
          Alexander Alten-Lorenz added a comment - Is this committed in TRUNK or 1.3.0-rc?
          Hide
          Hudson added a comment -

          Integrated in flume-trunk #322 (See https://builds.apache.org/job/flume-trunk/322/)
          FLUME-1425. Create a SpoolDirectory Source and Client. (Revision 44c713f194da266a393a017e59093d6d1bb959b5)

          Result = ABORTED
          mpercy : http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=44c713f194da266a393a017e59093d6d1bb959b5
          Files :

          • flume-ng-doc/sphinx/FlumeUserGuide.rst
          • flume-ng-core/src/main/java/org/apache/flume/client/avro/SpoolingFileLineReader.java
          • flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
          • flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
          • flume-ng-core/src/main/java/org/apache/flume/client/avro/LineReader.java
          • flume-ng-core/src/test/java/org/apache/flume/client/avro/TestBufferedLineReader.java
          • flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
          • flume-ng-core/src/main/java/org/apache/flume/client/avro/BufferedLineReader.java
          • flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
          • flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
          • flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
          • flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
          Show
          Hudson added a comment - Integrated in flume-trunk #322 (See https://builds.apache.org/job/flume-trunk/322/ ) FLUME-1425 . Create a SpoolDirectory Source and Client. (Revision 44c713f194da266a393a017e59093d6d1bb959b5) Result = ABORTED mpercy : http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=44c713f194da266a393a017e59093d6d1bb959b5 Files : flume-ng-doc/sphinx/FlumeUserGuide.rst flume-ng-core/src/main/java/org/apache/flume/client/avro/SpoolingFileLineReader.java flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java flume-ng-core/src/main/java/org/apache/flume/client/avro/LineReader.java flume-ng-core/src/test/java/org/apache/flume/client/avro/TestBufferedLineReader.java flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java flume-ng-core/src/main/java/org/apache/flume/client/avro/BufferedLineReader.java flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
          Hide
          NO NAME added a comment -

          This patch addresses a bug in the way that file timestamps are treated in the unit tests.

          Due to varying time granularity in filesystems, tests had inconsistent results. This should fix that error.

          Show
          NO NAME added a comment - This patch addresses a bug in the way that file timestamps are treated in the unit tests. Due to varying time granularity in filesystems, tests had inconsistent results. This should fix that error.
          Hide
          Mike Percy added a comment -

          Rather than @Ignore the test on commit, I posted a small patch to FLUME-1681 to disable the unit test for now.

          Show
          Mike Percy added a comment - Rather than @Ignore the test on commit, I posted a small patch to FLUME-1681 to disable the unit test for now.
          Hide
          Mike Percy added a comment -

          Patch committed. Thanks Patrick!

          Rev: 44c713f194da266a393a017e59093d6d1bb959b5

          Show
          Mike Percy added a comment - Patch committed. Thanks Patrick! Rev: 44c713f194da266a393a017e59093d6d1bb959b5
          Hide
          Mike Percy added a comment -

          +1

          Show
          Mike Percy added a comment - +1
          Hide
          Brock Noland added a comment -

          On a mac, java sets java.io.tmpdir (where that temp directory is created) to a very weird location, like:

          /private/var/folders/b4/b44x97M0GFydt3jCKcowsU+++TI/Tmp/

          so it's possible something is hosed up in that respect.

          Show
          Brock Noland added a comment - On a mac, java sets java.io.tmpdir (where that temp directory is created) to a very weird location, like: /private/var/folders/b4/b44x97M0GFydt3jCKcowsU+++TI/ Tmp / so it's possible something is hosed up in that respect.
          Hide
          Mike Percy added a comment -

          Aaron Baff: Would love to get enhancements on top of this work after it's committed. You may want to file a JIRA for that.

          NO NAME: I am still getting a unit test error on my Mac. I'll try to dig into it more tomorrow. This is the stack trace:

          -------------------------------------------------------------------------------
          Test set: org.apache.flume.client.avro.TestSpoolingFileLineReader
          -------------------------------------------------------------------------------
          Tests run: 16, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.241 sec <<< FAILURE!
          testBehaviorWithEmptyFile(org.apache.flume.client.avro.TestSpoolingFileLineReader)  Time elapsed: 0.007 sec  <<< FAILURE!
          java.lang.AssertionError
            at org.junit.Assert.fail(Assert.java:92)
            at org.junit.Assert.assertTrue(Assert.java:43)
            at org.junit.Assert.assertTrue(Assert.java:54)
            at org.apache.flume.client.avro.TestSpoolingFileLineReader.testBehaviorWithEmptyFile(TestSpoolingFileLineReader.java:396)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
            at java.lang.reflect.Method.invoke(Method.java:597)
            at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
            at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
            at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
            at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
            at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
            at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:30)
            at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
            at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
            at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
            at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
            at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
            at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
            at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
            at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
            at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
            at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:236)
            at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:134)
            at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:113)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
            at java.lang.reflect.Method.invoke(Method.java:597)
            at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
            at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
            at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
            at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:103)
            at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:74)
          
          Show
          Mike Percy added a comment - Aaron Baff : Would love to get enhancements on top of this work after it's committed. You may want to file a JIRA for that. NO NAME : I am still getting a unit test error on my Mac. I'll try to dig into it more tomorrow. This is the stack trace: ------------------------------------------------------------------------------- Test set: org.apache.flume.client.avro.TestSpoolingFileLineReader ------------------------------------------------------------------------------- Tests run: 16, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.241 sec <<< FAILURE! testBehaviorWithEmptyFile(org.apache.flume.client.avro.TestSpoolingFileLineReader) Time elapsed: 0.007 sec <<< FAILURE! java.lang.AssertionError at org.junit.Assert.fail(Assert.java:92) at org.junit.Assert.assertTrue(Assert.java:43) at org.junit.Assert.assertTrue(Assert.java:54) at org.apache.flume.client.avro.TestSpoolingFileLineReader.testBehaviorWithEmptyFile(TestSpoolingFileLineReader.java:396) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:30) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222) at org.junit.runners.ParentRunner.run(ParentRunner.java:300) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:236) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:134) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:113) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189) at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165) at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:103) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:74)
          Hide
          Aaron Baff added a comment -

          My fairly naive implementation of a Source to take files from a directory and process them in parallel. Not providing a patch as I'm intending this to be more of an inspiration to Patrick or Mike if they see anything useful in it. Or anyone else for that matter.

          Show
          Aaron Baff added a comment - My fairly naive implementation of a Source to take files from a directory and process them in parallel. Not providing a patch as I'm intending this to be more of an inspiration to Patrick or Mike if they see anything useful in it. Or anyone else for that matter.
          Hide
          NO NAME added a comment -

          Another version based on review feedback.

          Show
          NO NAME added a comment - Another version based on review feedback.
          Hide
          Mike Percy added a comment -

          @Aaron makes sense. I think we can add that functionality to this in the next iteration.
          @Philz Agreed on the Guava thing. Adding a deserialization API on top of this is in scope for FLUME-1633.

          Show
          Mike Percy added a comment - @Aaron makes sense. I think we can add that functionality to this in the next iteration. @Philz Agreed on the Guava thing. Adding a deserialization API on top of this is in scope for FLUME-1633 .
          Hide
          Philip Zeyliger added a comment -

          Cool stuff!

          Not sure if Flume already uses Guava, but if it does, I recommend Charsets.UTF_8 instead of calling Charset.forName("UTF-8") multiple times.

          It might be useful to comment explicitly that this is designed to read new-line delimited data. There is no way currently to configure this to read the entire file as a single record.

          Show
          Philip Zeyliger added a comment - Cool stuff! Not sure if Flume already uses Guava, but if it does, I recommend Charsets.UTF_8 instead of calling Charset.forName("UTF-8") multiple times. It might be useful to comment explicitly that this is designed to read new-line delimited data. There is no way currently to configure this to read the entire file as a single record.
          Hide
          Aaron Baff added a comment -

          You can always write a Source that builds on top of this. I've written one which takes a file or directory, and then reads in all files (recursive is an option) from the directory, and submits then to a ThreadPoolExecutor which you configure the total number of threads used. Worked quite well, and allows for the Sink to run slow instead of something like a `cat` EXEC Source which will just lose records if the Channel/Sink can't keep up.

          Now, it doesn't monitor the directory for new files, and doesn't rename them or look for a specific pattern, but the latter two wouldn't be too hard to add. Possibly add a monitor that every X seconds it'd scan through for new files of the correct patter and put them on the Executor to pull in.

          Show
          Aaron Baff added a comment - You can always write a Source that builds on top of this. I've written one which takes a file or directory, and then reads in all files (recursive is an option) from the directory, and submits then to a ThreadPoolExecutor which you configure the total number of threads used. Worked quite well, and allows for the Sink to run slow instead of something like a `cat` EXEC Source which will just lose records if the Channel/Sink can't keep up. Now, it doesn't monitor the directory for new files, and doesn't rename them or look for a specific pattern, but the latter two wouldn't be too hard to add. Possibly add a monitor that every X seconds it'd scan through for new files of the correct patter and put them on the Executor to pull in.
          Hide
          Mike Percy added a comment -

          Yeah actually you could do that with the implementation in this patch as long as you used different directories for each source.

          Show
          Mike Percy added a comment - Yeah actually you could do that with the implementation in this patch as long as you used different directories for each source.
          Hide
          Brock Noland added a comment -

          You could of course manually scale this, having one multiple sources per NAS/SAN interface.

          Show
          Brock Noland added a comment - You could of course manually scale this, having one multiple sources per NAS/SAN interface.
          Hide
          Mike Percy added a comment -

          Hans, that is out of scope of this JIRA but it could potentially be done on top of this work in the future using file locks or something like that. Obviously with concurrency it gets more complicated.

          Show
          Mike Percy added a comment - Hans, that is out of scope of this JIRA but it could potentially be done on top of this work in the future using file locks or something like that. Obviously with concurrency it gets more complicated.
          Hide
          Hans Uhlig added a comment -

          Is there any ability to scale this horizontally without having event duplication, one file per client obviously but multiple file readers for faster overall IO(specific use case is high bandwidth NAS/SAN drives).

          Show
          Hans Uhlig added a comment - Is there any ability to scale this horizontally without having event duplication, one file per client obviously but multiple file readers for faster overall IO(specific use case is high bandwidth NAS/SAN drives).
          Hide
          NO NAME added a comment -

          Attaching most recent version of the patch.

          Show
          NO NAME added a comment - Attaching most recent version of the patch.
          Hide
          NO NAME added a comment -

          Correct version of v6

          Show
          NO NAME added a comment - Correct version of v6
          Hide
          NO NAME added a comment -

          Adding another patch which addresses several comments form Mike Percy.

          Show
          NO NAME added a comment - Adding another patch which addresses several comments form Mike Percy.
          Hide
          NO NAME added a comment -

          This patch is ready for review. It creates both a new source (SpoolDirectorySource) and adds a spooling directory capability to the existing avro client.

          Includes extensive unit tests - probably best to start with those.

          Show
          NO NAME added a comment - This patch is ready for review. It creates both a new source (SpoolDirectorySource) and adds a spooling directory capability to the existing avro client. Includes extensive unit tests - probably best to start with those.
          Hide
          NO NAME added a comment -

          This is an implementation of a spooling client which extends the existing avro client. See reviewboard for more color.

          Show
          NO NAME added a comment - This is an implementation of a spooling client which extends the existing avro client. See reviewboard for more color.
          Hide
          NO NAME added a comment -

          One story would be to add this as an option to the AvroCLIClient. Right now you can either read from stdin or a single file, a natural extension would be to watch a directory and read from files dropped in that directory.

          Show
          NO NAME added a comment - One story would be to add this as an option to the AvroCLIClient. Right now you can either read from stdin or a single file, a natural extension would be to watch a directory and read from files dropped in that directory.

            People

            • Assignee:
              NO NAME
              Reporter:
              NO NAME
            • Votes:
              1 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development