Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.7.0
-
None
-
CentOS Linux release 7.2.1511 (Core)
java version "1.7.0_80"
-
Patch
Description
Test configuration.
source - taildir
interceptor - The custom interceptor drops some events
channel - anyone
sink - none
I found that taildir source sleeps frequently.
The tailFileProcess() function in TaildirSource.java break the loop by (events.size() < batchSize), but interceptor may change events.size().
I think the events.size() should be used before interceptor processing.
Avoid unnecessary sleep.
TaildirSource.java
private void tailFileProcess(TailFile tf, boolean backoffWithoutNL) throws IOException, InterruptedException { long receivedSize = 0; while (true) { reader.setCurrentFile(tf); List<Event> events = reader.readEvents(batchSize, backoffWithoutNL); if (events.isEmpty()) { break; } receivedSize = events.size(); sourceCounter.addToEventReceivedCount(receivedSize); sourceCounter.incrementAppendBatchReceivedCount(); try { getChannelProcessor().processEventBatch(events); reader.commit(); } catch (ChannelException ex) { logger.warn("The channel is full or unexpected failure. " + "The source will try again after " + retryInterval + " ms"); TimeUnit.MILLISECONDS.sleep(retryInterval); retryInterval = retryInterval << 1; retryInterval = Math.min(retryInterval, maxRetryInterval); continue; } retryInterval = 1000; sourceCounter.addToEventAcceptedCount(events.size()); sourceCounter.incrementAppendBatchAcceptedCount(); if (receivedSize < batchSize) { break; } } }