Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2309

Spooling directory should not always consume the oldest file first.

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.4.0
    • Fix Version/s: 1.5.0
    • Component/s: None
    • Labels:
    • Release Note:
      Adds consume order feature in the spooling directory source to allow files to be consumed in the order of oldest(default), youngest or random.

      Description

      The ReliableSpoolingFileEventReader reads the oldest file in the spooling directory first. This is done by listing the directory contents and then sorting file list based on timestamp. This may be very slow if there are a lot of files (of the order of 100K or more) in the directory.
      However, this is not always needed, there can be simple cases in which the order to consume the file is not important.
      There should be an option of consuming the files in arbitrary order, allowing the files to be consumed quickly without any delay.

      1. FLUME-2309-commit.patch
        27 kB
        Hari Shreedharan
      2. FLUME-2309-1.patch
        28 kB
        Muhammad Ehsan ul Haque
      3. FLUME-2309-0.patch
        28 kB
        Muhammad Ehsan ul Haque
      4. FLUME-2309-0.patch
        29 kB
        Muhammad Ehsan ul Haque

        Issue Links

          Activity

          Hide
          hshreedharan Hari Shreedharan added a comment -

          Makes sense to me. It should be a simple fix, but we must document it - since older files might get sent only very late. Maybe we can me it smarter even when we sort? Reuse the sorted list until it is empty to avoid a sort every time (not sure if we already do this).

          Show
          hshreedharan Hari Shreedharan added a comment - Makes sense to me. It should be a simple fix, but we must document it - since older files might get sent only very late. Maybe we can me it smarter even when we sort? Reuse the sorted list until it is empty to avoid a sort every time (not sure if we already do this).
          Hide
          ehsan Muhammad Ehsan ul Haque added a comment -

          No it doesn't maintain a sorted list. Before consuming a file, it gets a list of files and if the list is not empty then it sorts it.

          List<File> candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter));
              if (candidateFiles.isEmpty()) {
                return Optional.absent();
              } else {
                Collections.sort(candidateFiles, new Comparator<File>() {
                ....
          

          We can have a boolean parameter lets say consumeOldestFirst. If its value is true

          then

          • use a sorted buffer and consume from it, for this we will need to check if the file exists or it has been deleted. However, if there are so many files (of the order of millions) then this will be very resource consuming as we still need to get a list of files before sorting and buffering the top N sorted.

          else

          I can produce a patch for it, if someone can accept this proposal or propose something else.

          Show
          ehsan Muhammad Ehsan ul Haque added a comment - No it doesn't maintain a sorted list. Before consuming a file, it gets a list of files and if the list is not empty then it sorts it. List<File> candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter)); if (candidateFiles.isEmpty()) { return Optional.absent(); } else { Collections.sort(candidateFiles, new Comparator<File>() { .... We can have a boolean parameter lets say consumeOldestFirst . If its value is true then use a sorted buffer and consume from it, for this we will need to check if the file exists or it has been deleted. However, if there are so many files (of the order of millions) then this will be very resource consuming as we still need to get a list of files before sorting and buffering the top N sorted. else use Java Files.newDirectoryStream , but as mentioned by Hari Shreedharan may cause older files to be sent very late. I can produce a patch for it, if someone can accept this proposal or propose something else.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          In case there are millions of files, the directory listing itself would take a lot of time. I don't know if there is anything much we can do in that case. Also, like you said keeping a sorted buffer is not a good idea when there are so many files. Do you see another way out?

          Unfortunately, we cannot use a directory stream since we still need to support Java 6.

          Please go ahead and work on this.

          Show
          hshreedharan Hari Shreedharan added a comment - In case there are millions of files, the directory listing itself would take a lot of time. I don't know if there is anything much we can do in that case. Also, like you said keeping a sorted buffer is not a good idea when there are so many files. Do you see another way out? Unfortunately, we cannot use a directory stream since we still need to support Java 6. Please go ahead and work on this.
          Hide
          ehsan Muhammad Ehsan ul Haque added a comment -

          Perhaps we can use apache commons FileUtils.iterateFiles
          In case we want the files to be consumed in an arbitrary order, then I believe an iterator will be very cheap.

          Okay I will work on this.

          Show
          ehsan Muhammad Ehsan ul Haque added a comment - Perhaps we can use apache commons FileUtils.iterateFiles In case we want the files to be consumed in an arbitrary order, then I believe an iterator will be very cheap. Okay I will work on this.
          Hide
          ehsan Muhammad Ehsan ul Haque added a comment -

          This patch provides.

          • A consume order feature in the Spooling directory source, which will allow the users to explicitly state in which order; oldest, youngest or randomly files should be consumed from the spooling directory.
          • Fixes the old implementation of selecting the file from spooling directory. Previously, each file to be consumed was selected by sorting, which might become extremly time consuming if there are many files (of the order of 10K or more). The new implementation instead do a linear scan in case when the consume order is oldest or youngest.
          • Updates the Flume user guide accordingly.
          Show
          ehsan Muhammad Ehsan ul Haque added a comment - This patch provides. A consume order feature in the Spooling directory source, which will allow the users to explicitly state in which order; oldest, youngest or randomly files should be consumed from the spooling directory. Fixes the old implementation of selecting the file from spooling directory. Previously, each file to be consumed was selected by sorting, which might become extremly time consuming if there are many files (of the order of 10K or more). The new implementation instead do a linear scan in case when the consume order is oldest or youngest. Updates the Flume user guide accordingly.
          Hide
          ehsan Muhammad Ehsan ul Haque added a comment -

          Removed trailing white spaces from the patch.

          Show
          ehsan Muhammad Ehsan ul Haque added a comment - Removed trailing white spaces from the patch.
          Hide
          ehsan Muhammad Ehsan ul Haque added a comment -
          Show
          ehsan Muhammad Ehsan ul Haque added a comment - Review request https://reviews.apache.org/r/17867/
          Hide
          ehsan Muhammad Ehsan ul Haque added a comment -

          Can anybody review this patch, its been waiting for review for sometime.

          Show
          ehsan Muhammad Ehsan ul Haque added a comment - Can anybody review this patch, its been waiting for review for sometime.
          Hide
          ehsan Muhammad Ehsan ul Haque added a comment - - edited

          Reminder for review.

          Show
          ehsan Muhammad Ehsan ul Haque added a comment - - edited Reminder for review.
          Hide
          ehsan Muhammad Ehsan ul Haque added a comment -

          Has anyone able to review it?

          Show
          ehsan Muhammad Ehsan ul Haque added a comment - Has anyone able to review it?
          Hide
          hshreedharan Hari Shreedharan added a comment -

          FYI - This does not really solve the listing problem. The iterateFiles method does seem to list the files anyway (I looked at the commons source code). I'd recommend using the older code for listing.

          This looks good. Only one change I'd recommend, the default order cannot change. It must stay random, since the additional overhead of comparison is something that most people don't expect. Other than that, this patch looks good to go. If you change that to default to the old one, I will commit this.

          Show
          hshreedharan Hari Shreedharan added a comment - FYI - This does not really solve the listing problem. The iterateFiles method does seem to list the files anyway (I looked at the commons source code). I'd recommend using the older code for listing. This looks good. Only one change I'd recommend, the default order cannot change. It must stay random, since the additional overhead of comparison is something that most people don't expect. Other than that, this patch looks good to go. If you change that to default to the old one, I will commit this.
          Hide
          ehsan Muhammad Ehsan ul Haque added a comment -

          I think listing was not the problem, it was sorting the files.

          The default order in the current implementation is oldest first (because of the sort), and in the fix it is not changed, it is still oldest first. I just improved it by not sorting which is of the order of O(N*logN) and just doing a scan over all the files and picking the oldest one which is of the order of O(N).

          I can use the older code for listing instead of iterator if you want?

          Show
          ehsan Muhammad Ehsan ul Haque added a comment - I think listing was not the problem, it was sorting the files. The default order in the current implementation is oldest first (because of the sort), and in the fix it is not changed, it is still oldest first. I just improved it by not sorting which is of the order of O(N*logN) and just doing a scan over all the files and picking the oldest one which is of the order of O(N). I can use the older code for listing instead of iterator if you want?
          Hide
          hshreedharan Hari Shreedharan added a comment -

          I like the oldest/newest first implementation without sorting.

          Yes, we should probably remove the IOFileFilter and the iterator - seems a bit confusing. Oh, yes, I remember now (it was the whole point of the jira) that default was oldest first.

          Show
          hshreedharan Hari Shreedharan added a comment - I like the oldest/newest first implementation without sorting. Yes, we should probably remove the IOFileFilter and the iterator - seems a bit confusing. Oh, yes, I remember now (it was the whole point of the jira) that default was oldest first.
          Hide
          ehsan Muhammad Ehsan ul Haque added a comment -

          Patch Updated.

          Reverted back to the old FileFilter.
          Keept the default consume order to oldest, as it was the behavior originally.

          Show
          ehsan Muhammad Ehsan ul Haque added a comment - Patch Updated. Reverted back to the old FileFilter. Keept the default consume order to oldest, as it was the behavior originally.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          +1. I made some changes to the patch (removed the Comparator as that was not inline with the interface definition). I am committing this now.

          Show
          hshreedharan Hari Shreedharan added a comment - +1. I made some changes to the patch (removed the Comparator as that was not inline with the interface definition). I am committing this now.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          The actual patch being committed.

          Show
          hshreedharan Hari Shreedharan added a comment - The actual patch being committed.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 61b9bcbb69ae3d19f72276b3aaa78ff3679cecfc in flume's branch refs/heads/trunk from Hari Shreedharan
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=61b9bcb ]

          FLUME-2309. Spooling directory should not always consume the oldest file first.

          (Muhammad Ehsan ul Haque via Hari Shreedharan)

          Show
          jira-bot ASF subversion and git services added a comment - Commit 61b9bcbb69ae3d19f72276b3aaa78ff3679cecfc in flume's branch refs/heads/trunk from Hari Shreedharan [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=61b9bcb ] FLUME-2309 . Spooling directory should not always consume the oldest file first. (Muhammad Ehsan ul Haque via Hari Shreedharan)
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 9df1abb84a17874a3e658fcea155aa865506aff7 in flume's branch refs/heads/flume-1.5 from Hari Shreedharan
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=9df1abb ]

          FLUME-2309. Spooling directory should not always consume the oldest file first.

          (Muhammad Ehsan ul Haque via Hari Shreedharan)

          Show
          jira-bot ASF subversion and git services added a comment - Commit 9df1abb84a17874a3e658fcea155aa865506aff7 in flume's branch refs/heads/flume-1.5 from Hari Shreedharan [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=9df1abb ] FLUME-2309 . Spooling directory should not always consume the oldest file first. (Muhammad Ehsan ul Haque via Hari Shreedharan)
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Committed! Thanks Muhammad!

          Show
          hshreedharan Hari Shreedharan added a comment - Committed! Thanks Muhammad!
          Hide
          hudson Hudson added a comment -

          UNSTABLE: Integrated in flume-trunk #630 (See https://builds.apache.org/job/flume-trunk/630/)
          FLUME-2309. Spooling directory should not always consume the oldest file first. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo/?p=flume.git&a=commit&h=61b9bcbb69ae3d19f72276b3aaa78ff3679cecfc)

          • flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
          • flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
          • flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
          • flume-ng-doc/sphinx/FlumeUserGuide.rst
          • flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
          • flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
          Show
          hudson Hudson added a comment - UNSTABLE: Integrated in flume-trunk #630 (See https://builds.apache.org/job/flume-trunk/630/ ) FLUME-2309 . Spooling directory should not always consume the oldest file first. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo/?p=flume.git&a=commit&h=61b9bcbb69ae3d19f72276b3aaa78ff3679cecfc ) flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java flume-ng-doc/sphinx/FlumeUserGuide.rst flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
          Hide
          ehsan Muhammad Ehsan ul Haque added a comment -

          Your changes for selecting YOUNGEST/OLDEST file is not deterministic and violates what is written in the document, and also the old behavior which used to select the OLDSET file.
          The documented and the old behavior was that in case of a tie on timestamp, the file with the smallest lexicographical name will be picked. This is missing in the patch that has been committed. Also this is a reason that tests are failing as mentioned in FLUME-2350.

          Show
          ehsan Muhammad Ehsan ul Haque added a comment - Your changes for selecting YOUNGEST/OLDEST file is not deterministic and violates what is written in the document, and also the old behavior which used to select the OLDSET file. The documented and the old behavior was that in case of a tie on timestamp, the file with the smallest lexicographical name will be picked. This is missing in the patch that has been committed. Also this is a reason that tests are failing as mentioned in FLUME-2350 .

            People

            • Assignee:
              ehsan Muhammad Ehsan ul Haque
              Reporter:
              ehsan Muhammad Ehsan ul Haque
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development