Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-1685

ExecSource shouldn't die if the channel is full


    • Type: Bug
    • Status: Patch Available
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.2.0, 1.3.0, 1.4.0
    • Fix Version/s: None
    • Component/s: Sinks+Sources
    • Labels:


      Imagine this scenario. You are using the ExecSource to tail a file and send to a file channel. When the file channel fills due to a temporary issue downstream, the source gets a ChannelException which kills the source.

      2012-10-31 20:45:57,872 ERROR source.ExecSource: Failed while running command: tail -F /tmp/test.log
      org.apache.flume.ChannelException: Unable to put batch on required channel: FileChannel test { dataDirs: [/tmp/test/data] }
              at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:195)
              at org.apache.flume.source.ExecSource$ExecRunnable.run(ExecSource.java:275)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
              at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
              at java.util.concurrent.FutureTask.run(FutureTask.java:138)
              at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
              at java.lang.Thread.run(Thread.java:662)
      Caused by: org.apache.flume.ChannelException: Cannot acquire capacity. [channel=hbasejson]
              at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:346)
              at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
              at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:76)
              at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:184)
              ... 7 more

      The situation where the command being 'exec'ed fails/exits is already handled with the existing retry logic.

      I suggest that when the source gets a ChannelException it throw the event away (since there is nowhere to put it) and instead sleep for second and loop again for another event. If the channel is still throwing an exception (still full), the event dropped and the sleep time doubled and we repeat again. There should be an upper bound on the retry time (say 128 seconds – about 2 minutes) for the next attempt. When the putEvent no longer throws a ChannelException, the "fallback" mode is reset and we read records at full speed again.

      Clearly in a situation where the channel is full, data loss will happen. But in this case, we wouldn't have to restart the agent. At scale this is an administrative pain. Even detecting this is difficult as the flume agent itself is still running. In this case (running a 'tail'), the tail will eventually result in data loss should the file being tailed rotate. Something has to give somewhere.

      I've got a patch I'm working on for this, but wanted to get the JIRA rolling first.




            • Assignee:
              hoffman60613 Steve Hoffman
            • Votes:
              0 Vote for this issue
              4 Start watching this issue


              • Created: