Flume
  1. Flume
  2. FLUME-2318

SpoolingDirectory is unable to handle empty files

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: v1.4.0
    • Fix Version/s: v1.7.0
    • Component/s: Sinks+Sources
    • Labels:
    • Release Note:
      Spooling Directory Source is capable to handle empty files.

      Description

      Empty files should be returned as an empty event instead of no event.

      Scenario

      From the start consume files in this order

      1. f1: File with data or empty file
      2. f2: Empty File
      3. No file in spooling directory

      Expected Outcome

      1. channel.take() should return event with f1 data.
      2. channel.take() should return event with f2 data (empty data).
      3. channel.take() should return null.

      What happens

      1. channel.take() returns event with f1 data.
      2. channel.take() returns null.
      3. Exception is raised when the SpoolDirectorySource thread tries to read events from the ReliableSpoolingFileEventReader. Snippet of trace is
        2014-02-09 15:46:35,832 (pool-1-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)] Preparing to move file /tmp/1391957195572-0/file1 to /tmp/1391957195572-0/file1.COMPLETED
        2014-02-09 15:46:36,334 (pool-1-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:228)] Last read was never committed - resetting mark position.
        2014-02-09 15:46:36,335 (pool-1-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)] Preparing to move file /tmp/1391957195572-0/file2 to /tmp/1391957195572-0/file2.COMPLETED
        2014-02-09 15:46:36,839 (pool-1-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:252)] FATAL: Spool Directory source null: { spoolDir: /tmp/1391957195572-0 }

        : Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
        java.lang.IllegalStateException: File should not roll when commit is outstanding.
        at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:225)
        at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:224)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)

      Unit Test

      In TestSpoolDirectorySource

        @Test
        public void testWithEmptyFile2()
            throws InterruptedException, IOException {
          Context context = new Context();
          File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
          Files.write("some data".getBytes(), f1);
          File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
          Files.write(new byte[0], f2);
          context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
              tmpDir.getAbsolutePath());
          Configurables.configure(source, context);
          source.start();
          Thread.sleep(10);
          for (int i=0; i<2; i++) {
            Transaction txn = channel.getTransaction();
            txn.begin();
            Event e = channel.take();
            txn.commit();
            txn.close();
          }
          Transaction txn = channel.getTransaction();
          txn.begin();
          Assert.assertNull(channel.take());
          txn.commit();
          txn.close();
        }
      
      1. FLUME-2318-0.patch
        7 kB
        Muhammad Ehsan ul Haque
      2. FLUME-2318-1.patch
        8 kB
        Muhammad Ehsan ul Haque
      3. FLUME-2318-2.patch
        13 kB
        Muhammad Ehsan ul Haque

        Issue Links

          Activity

          Hide
          Muhammad Ehsan ul Haque added a comment -

          Empty files, should be returned as events, with empty body.

          Show
          Muhammad Ehsan ul Haque added a comment - Empty files, should be returned as events, with empty body.
          Hide
          Muhammad Ehsan ul Haque added a comment -

          This patch enables SpoolingDirectorySource to handle empty files and treat them as events, with an empty body (new byte[0])

          Show
          Muhammad Ehsan ul Haque added a comment - This patch enables SpoolingDirectorySource to handle empty files and treat them as events, with an empty body (new byte [0] )
          Hide
          Muhammad Ehsan ul Haque added a comment -
          Show
          Muhammad Ehsan ul Haque added a comment - Review Request https://reviews.apache.org/r/17883/
          Hide
          Muhammad Ehsan ul Haque added a comment -

          Can anybody review this patch, its been waiting for review for sometime.

          Show
          Muhammad Ehsan ul Haque added a comment - Can anybody review this patch, its been waiting for review for sometime.
          Hide
          Muhammad Ehsan ul Haque added a comment -

          Can anybody review this patch, its been waiting for review for sometime.

          Show
          Muhammad Ehsan ul Haque added a comment - Can anybody review this patch, its been waiting for review for sometime.
          Hide
          Hari Shreedharan added a comment -

          I will look at this in a couple days.

          Show
          Hari Shreedharan added a comment - I will look at this in a couple days.
          Hide
          Muhammad Ehsan ul Haque added a comment -

          Reminder for review!!

          Show
          Muhammad Ehsan ul Haque added a comment - Reminder for review!!
          Hide
          Hari Shreedharan added a comment -

          This looks good to me. Let's add an update to the doc that says that an empty file will cause an event with an empty body to be sent.

          Show
          Hari Shreedharan added a comment - This looks good to me. Let's add an update to the doc that says that an empty file will cause an event with an empty body to be sent.
          Hide
          Muhammad Ehsan ul Haque added a comment -

          Updated the UserGuide as well. Please review now.

          Show
          Muhammad Ehsan ul Haque added a comment - Updated the UserGuide as well. Please review now.
          Hide
          Muhammad Ehsan ul Haque added a comment -

          Hari Shreedharan is anything more required?

          Show
          Muhammad Ehsan ul Haque added a comment - Hari Shreedharan is anything more required?
          Hide
          Hari Shreedharan added a comment -

          Looking at this one now.

          Show
          Hari Shreedharan added a comment - Looking at this one now.
          Hide
          Hari Shreedharan added a comment -

          This one seems to have caused test failures:

          
          Failed tests:   testRepeatedCallsWithCommitAlways(org.apache.flume.client.avro.TestReliableSpoolingFileEventReader): expected:<7> but was:<8>
            testRepeatedCallsWithCommitOnSuccess(org.apache.flume.client.avro.TestReliableSpoolingFileEventReader): expected:<7> but was:<8>
            testBehaviorWithEmptyFile(org.apache.flume.client.avro.TestSpoolingFileLineReader): expected:<8> but was:<1>
          
          
          Show
          Hari Shreedharan added a comment - This one seems to have caused test failures: Failed tests: testRepeatedCallsWithCommitAlways(org.apache.flume.client.avro.TestReliableSpoolingFileEventReader): expected:<7> but was:<8> testRepeatedCallsWithCommitOnSuccess(org.apache.flume.client.avro.TestReliableSpoolingFileEventReader): expected:<7> but was:<8> testBehaviorWithEmptyFile(org.apache.flume.client.avro.TestSpoolingFileLineReader): expected:<8> but was:<1>
          Hide
          Muhammad Ehsan ul Haque added a comment -

          My bad, should have run all the tests.
          After looking at the failing tests, I think the option for empty files being read as an event should be provided as an option instead of having it a default behavior. Just fixing the failing tests is not the right solution.
          What do you think about that?
          Should I provide the option of empty files being read as event? and update the documentation accordingly?

          Show
          Muhammad Ehsan ul Haque added a comment - My bad, should have run all the tests. After looking at the failing tests, I think the option for empty files being read as an event should be provided as an option instead of having it a default behavior. Just fixing the failing tests is not the right solution. What do you think about that? Should I provide the option of empty files being read as event? and update the documentation accordingly?
          Hide
          Muhammad Ehsan ul Haque added a comment -

          I re-evaluated and narrow down the exact reason and cause for the bug.
          To be exact the spooling directory is unable to handle empty file, "If empty file happens to be the last consumable file in the directory.", This is due to the fact that the committed flag in org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java is not set properly for empty files.

          To test this out here is a test case that fails, if added in "org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java" test, the test is a slight modification of an already present test method testRepeatedCallsWithCommitOnSuccess

          @Test
            public void testRepeatedCallsWithCommitOnSuccessWithEmptyFileInTheLast() throws IOException {
              Files.touch(new File(WORK_DIR, "newEmptyFile"));
          
              String trackerDirPath =
                  SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
              File trackerDir = new File(WORK_DIR, trackerDirPath);
          
              ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
                  .spoolDirectory(WORK_DIR).trackerDirPath(trackerDirPath).build();
          
              final int expectedLines = 0 + 1 + 2 + 3 + 1;
              int seenLines = 0;
              for (int i = 0; i < 10; i++) {
                List<Event> events = reader.readEvents(10);
                int numEvents = events.size();
                if (numEvents > 0) {
                  seenLines += numEvents;
                  reader.commit();
          
                  // ensure that there are files in the trackerDir
                  File[] files = trackerDir.listFiles();
                  Assert.assertNotNull(files);
                  Assert.assertTrue("Expected tracker files in tracker dir " + trackerDir
                      .getAbsolutePath(), files.length > 0);
                }
              }
          
              Assert.assertEquals(expectedLines, seenLines);
            }
          

          This is the exception thrown

          java.lang.IllegalStateException: File should not roll when commit is outstanding.
          	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:224)
          	at org.apache.flume.client.avro.TestReliableSpoolingFileEventReader.testRepeatedCallsWithCommitOnSuccess(TestReliableSpoolingFileEventReader.java:152
          

          Possible Fixes

          • Fix the test cases testRepeatedCallsWithCommitOnSuccess and the new test testRepeatedCallsWithCommitOnSuccessWithEmptyFileInTheLast to always commit even if reader.readEvents() returns an empty list, and also fix the implementations like org/apache/flume/source/SpoolDirectorySource.java to do the same as suggested.
          • Explicitly set the committed flag to true when the file has been rolled successfully in org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java

          Also I would like to have an optional flag in the Spooling directory to allow to read empty files as events, because in some use cases where file name and other meta data related to the file is important for the receiver end. The current implementation just ignores empty files.

          Show
          Muhammad Ehsan ul Haque added a comment - I re-evaluated and narrow down the exact reason and cause for the bug. To be exact the spooling directory is unable to handle empty file, "If empty file happens to be the last consumable file in the directory.", This is due to the fact that the committed flag in org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java is not set properly for empty files. To test this out here is a test case that fails, if added in "org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java" test, the test is a slight modification of an already present test method testRepeatedCallsWithCommitOnSuccess @Test public void testRepeatedCallsWithCommitOnSuccessWithEmptyFileInTheLast() throws IOException { Files.touch( new File(WORK_DIR, "newEmptyFile" )); String trackerDirPath = SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR; File trackerDir = new File(WORK_DIR, trackerDirPath); ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder() .spoolDirectory(WORK_DIR).trackerDirPath(trackerDirPath).build(); final int expectedLines = 0 + 1 + 2 + 3 + 1; int seenLines = 0; for ( int i = 0; i < 10; i++) { List<Event> events = reader.readEvents(10); int numEvents = events.size(); if (numEvents > 0) { seenLines += numEvents; reader.commit(); // ensure that there are files in the trackerDir File[] files = trackerDir.listFiles(); Assert.assertNotNull(files); Assert.assertTrue( "Expected tracker files in tracker dir " + trackerDir .getAbsolutePath(), files.length > 0); } } Assert.assertEquals(expectedLines, seenLines); } This is the exception thrown java.lang.IllegalStateException: File should not roll when commit is outstanding. at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:224) at org.apache.flume.client.avro.TestReliableSpoolingFileEventReader.testRepeatedCallsWithCommitOnSuccess(TestReliableSpoolingFileEventReader.java:152 Possible Fixes Fix the test cases testRepeatedCallsWithCommitOnSuccess and the new test testRepeatedCallsWithCommitOnSuccessWithEmptyFileInTheLast to always commit even if reader.readEvents() returns an empty list, and also fix the implementations like org/apache/flume/source/SpoolDirectorySource.java to do the same as suggested. Explicitly set the committed flag to true when the file has been rolled successfully in org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java Also I would like to have an optional flag in the Spooling directory to allow to read empty files as events, because in some use cases where file name and other meta data related to the file is important for the receiver end. The current implementation just ignores empty files.
          Hide
          Hari Shreedharan added a comment -

          OK, let's do it as optional and keep the current behavior the same. We should parameterize the tests with one that ignores and one that sends empty files with the corresponding metadata. We should ensure correct behavior in both cases.

          Show
          Hari Shreedharan added a comment - OK, let's do it as optional and keep the current behavior the same. We should parameterize the tests with one that ignores and one that sends empty files with the corresponding metadata. We should ensure correct behavior in both cases.
          Hide
          Muhammad Ehsan ul Haque added a comment -

          Fixed the bug for throwing exception when an empty file is the last file consumed by the spooling directory source. Also added an option to consume empty file as event. Updated the user guide accordingly.

          Show
          Muhammad Ehsan ul Haque added a comment - Fixed the bug for throwing exception when an empty file is the last file consumed by the spooling directory source. Also added an option to consume empty file as event. Updated the user guide accordingly.
          Hide
          Muhammad Ehsan ul Haque added a comment -

          Reminder for request for review!!

          Show
          Muhammad Ehsan ul Haque added a comment - Reminder for request for review!!
          Hide
          Muhammad Ehsan ul Haque added a comment -

          Gentle reminder for review.

          Show
          Muhammad Ehsan ul Haque added a comment - Gentle reminder for review.
          Hide
          Muhammad Ehsan ul Haque added a comment -

          Hari Shreedharan can you please look into it. It was deferred due to release of 1.5.

          Show
          Muhammad Ehsan ul Haque added a comment - Hari Shreedharan can you please look into it. It was deferred due to release of 1.5.
          Hide
          Bessenyei Balázs Donát added a comment -

          I have created a new reviewboard request: https://reviews.apache.org/r/50286/
          (Rebasing it on trunk and some autoformat)

          NOTE: this patch changes how SpoolingDirectory source works. See lines 433 and 434 in reviewboard.

          Show
          Bessenyei Balázs Donát added a comment - I have created a new reviewboard request: https://reviews.apache.org/r/50286/ (Rebasing it on trunk and some autoformat) NOTE: this patch changes how SpoolingDirectory source works. See lines 433 and 434 in reviewboard.
          Hide
          Bessenyei Balázs Donát added a comment -

          The patch for this issue has been available for a while, but it needs some reviews to be committed to trunk.

          Is there anyone who could help with this?

          Show
          Bessenyei Balázs Donát added a comment - The patch for this issue has been available for a while, but it needs some reviews to be committed to trunk. Is there anyone who could help with this?
          Hide
          Mike Percy added a comment -

          +1. I am about to commit this.

          Show
          Mike Percy added a comment - +1. I am about to commit this.
          Hide
          ASF subversion and git services added a comment -

          Commit a9a775a9c8324e0724ed4720ae4f383896ea8c96 in flume's branch refs/heads/trunk from Bessenyei Balázs Donát
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=a9a775a ]

          FLUME-2318: Make SpoolingDirectorySource able to handle empty files

          (Bessenyei Balázs Donát via Mike Percy)

          Show
          ASF subversion and git services added a comment - Commit a9a775a9c8324e0724ed4720ae4f383896ea8c96 in flume's branch refs/heads/trunk from Bessenyei Balázs Donát [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=a9a775a ] FLUME-2318 : Make SpoolingDirectorySource able to handle empty files (Bessenyei Balázs Donát via Mike Percy)
          Hide
          Mike Percy added a comment -

          Pushed to trunk. Thanks for the patch, Muhammad and Bessenyei!

          Show
          Mike Percy added a comment - Pushed to trunk. Thanks for the patch, Muhammad and Bessenyei!
          Hide
          Hudson added a comment -

          FAILURE: Integrated in Jenkins build Flume-trunk-hbase-1 #194 (See https://builds.apache.org/job/Flume-trunk-hbase-1/194/)
          FLUME-2318: Make SpoolingDirectorySource able to handle empty files (mpercy: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=a9a775a9c8324e0724ed4720ae4f383896ea8c96)

          • (edit) flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
          • (edit) flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
          Show
          Hudson added a comment - FAILURE: Integrated in Jenkins build Flume-trunk-hbase-1 #194 (See https://builds.apache.org/job/Flume-trunk-hbase-1/194/ ) FLUME-2318 : Make SpoolingDirectorySource able to handle empty files (mpercy: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=a9a775a9c8324e0724ed4720ae4f383896ea8c96 ) (edit) flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java (edit) flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
          Hide
          ASF subversion and git services added a comment -

          Commit b6dede8c108e0d8e3516767de31e3a847c69cfe0 in flume's branch refs/heads/trunk from Bessenyei Balázs Donát
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=b6dede8 ]

          FLUME-2318: Make SpoolingDirectorySource able to handle empty files

          (Muhammad Ehsan ul Haque and Bessenyei Balázs Donát via Mike Percy)

          Show
          ASF subversion and git services added a comment - Commit b6dede8c108e0d8e3516767de31e3a847c69cfe0 in flume's branch refs/heads/trunk from Bessenyei Balázs Donát [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=b6dede8 ] FLUME-2318 : Make SpoolingDirectorySource able to handle empty files (Muhammad Ehsan ul Haque and Bessenyei Balázs Donát via Mike Percy)

            People

            • Assignee:
              Bessenyei Balázs Donát
              Reporter:
              Muhammad Ehsan ul Haque
            • Votes:
              1 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development