Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2318

SpoolingDirectory is unable to handle empty files

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.4.0
    • Fix Version/s: 1.7.0
    • Component/s: Sinks+Sources
    • Labels:
    • Release Note:
      Spooling Directory Source is capable to handle empty files.

      Description

      Empty files should be returned as an empty event instead of no event.

      Scenario

      From the start consume files in this order

      1. f1: File with data or empty file
      2. f2: Empty File
      3. No file in spooling directory

      Expected Outcome

      1. channel.take() should return event with f1 data.
      2. channel.take() should return event with f2 data (empty data).
      3. channel.take() should return null.

      What happens

      1. channel.take() returns event with f1 data.
      2. channel.take() returns null.
      3. Exception is raised when the SpoolDirectorySource thread tries to read events from the ReliableSpoolingFileEventReader. Snippet of trace is
        2014-02-09 15:46:35,832 (pool-1-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)] Preparing to move file /tmp/1391957195572-0/file1 to /tmp/1391957195572-0/file1.COMPLETED
        2014-02-09 15:46:36,334 (pool-1-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:228)] Last read was never committed - resetting mark position.
        2014-02-09 15:46:36,335 (pool-1-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)] Preparing to move file /tmp/1391957195572-0/file2 to /tmp/1391957195572-0/file2.COMPLETED
        2014-02-09 15:46:36,839 (pool-1-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:252)] FATAL: Spool Directory source null: { spoolDir: /tmp/1391957195572-0 }

        : Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
        java.lang.IllegalStateException: File should not roll when commit is outstanding.
        at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:225)
        at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:224)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)

      Unit Test

      In TestSpoolDirectorySource

        @Test
        public void testWithEmptyFile2()
            throws InterruptedException, IOException {
          Context context = new Context();
          File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
          Files.write("some data".getBytes(), f1);
          File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
          Files.write(new byte[0], f2);
          context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
              tmpDir.getAbsolutePath());
          Configurables.configure(source, context);
          source.start();
          Thread.sleep(10);
          for (int i=0; i<2; i++) {
            Transaction txn = channel.getTransaction();
            txn.begin();
            Event e = channel.take();
            txn.commit();
            txn.close();
          }
          Transaction txn = channel.getTransaction();
          txn.begin();
          Assert.assertNull(channel.take());
          txn.commit();
          txn.close();
        }
      

        Attachments

        1. FLUME-2318-2.patch
          13 kB
          Muhammad Ehsan ul Haque
        2. FLUME-2318-1.patch
          8 kB
          Muhammad Ehsan ul Haque
        3. FLUME-2318-0.patch
          7 kB
          Muhammad Ehsan ul Haque

          Activity

            People

            • Assignee:
              bessbd Bessenyei Balázs Donát
              Reporter:
              ehsan Muhammad Ehsan ul Haque
            • Votes:
              1 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: