Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
1.4.0
-
Spooling Directory Source is capable to handle empty files.
Description
Empty files should be returned as an empty event instead of no event.
Scenario
From the start consume files in this order
- f1: File with data or empty file
- f2: Empty File
- No file in spooling directory
Expected Outcome
- channel.take() should return event with f1 data.
- channel.take() should return event with f2 data (empty data).
- channel.take() should return null.
What happens
- channel.take() returns event with f1 data.
- channel.take() returns null.
- Exception is raised when the SpoolDirectorySource thread tries to read events from the ReliableSpoolingFileEventReader. Snippet of trace is
2014-02-09 15:46:35,832 (pool-1-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)] Preparing to move file /tmp/1391957195572-0/file1 to /tmp/1391957195572-0/file1.COMPLETED
2014-02-09 15:46:36,334 (pool-1-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:228)] Last read was never committed - resetting mark position.
2014-02-09 15:46:36,335 (pool-1-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)] Preparing to move file /tmp/1391957195572-0/file2 to /tmp/1391957195572-0/file2.COMPLETED
2014-02-09 15:46:36,839 (pool-1-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:252)] FATAL: Spool Directory source null: { spoolDir: /tmp/1391957195572-0 }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.lang.IllegalStateException: File should not roll when commit is outstanding.
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:225)
at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:224)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Unit Test
In TestSpoolDirectorySource
@Test public void testWithEmptyFile2() throws InterruptedException, IOException { Context context = new Context(); File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); Files.write("some data".getBytes(), f1); File f2 = new File(tmpDir.getAbsolutePath() + "/file2"); Files.write(new byte[0], f2); context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, tmpDir.getAbsolutePath()); Configurables.configure(source, context); source.start(); Thread.sleep(10); for (int i=0; i<2; i++) { Transaction txn = channel.getTransaction(); txn.begin(); Event e = channel.take(); txn.commit(); txn.close(); } Transaction txn = channel.getTransaction(); txn.begin(); Assert.assertNull(channel.take()); txn.commit(); txn.close(); }
Attachments
Attachments
Issue Links
- links to