Details
-
Bug
-
Status: Open
-
Critical
-
Resolution: Unresolved
-
1.7.0
-
None
Description
Taildir source doesn't call stop() on graceful shutdown.
Test configuration.
source - taildir
channel - PseudoTxnMemoryChannel / flume-kafka-channel
sink - none
I found that flume sometimes doesn't terminate with Taildir source.
I had to kill the process to terminate it.
tailFileProcess() function in TaildirSource.java has a infinite loop.
When the process interrupted, ChannelException will happen, but it can't breaks the infinite loop.
I think that's the reason why Taildir can't call stop() function.
TaildirSource.java
private void tailFileProcess(TailFile tf, boolean backoffWithoutNL) throws IOException, InterruptedException { while (true) { reader.setCurrentFile(tf); List<Event> events = reader.readEvents(batchSize, backoffWithoutNL); if (events.isEmpty()) { break; } sourceCounter.addToEventReceivedCount(events.size()); sourceCounter.incrementAppendBatchReceivedCount(); try { getChannelProcessor().processEventBatch(events); reader.commit(); } catch (ChannelException ex) { logger.warn("The channel is full or unexpected failure. " + "The source will try again after " + retryInterval + " ms"); TimeUnit.MILLISECONDS.sleep(retryInterval); retryInterval = retryInterval << 1; retryInterval = Math.min(retryInterval, maxRetryInterval); continue; } retryInterval = 1000; sourceCounter.addToEventAcceptedCount(events.size()); sourceCounter.incrementAppendBatchAcceptedCount(); if (events.size() < batchSize) { break; } } }