Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2240

Add Throttling to spool directory source

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 1.4.0
    • None
    • Sinks+Sources
    • linux debian cdh4.4.0

    Description

      As a user I would like to replay a big file using the spool directory source without creating to much back pressure on the underlaying channel :

      We tried to setup a recovery procedure using the spool directory and it appears that our avro sink can not handle events fast enough so the channel is throwing the following exception :

      19 Nov 2013 12:32:10,100 ERROR [pool-10-thread-1] (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:195)  - FATAL: Spool Directory source recovery: { spoolDir: /data/flumeng/recovery }: Uncaught exception in Spoo
      lDirectorySource thread. Restart or reconfigure Flume to continue processing.
      org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel{name: recovery}
              at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
              at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:189)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:440)
              at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
              at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
              at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:909)
              at java.lang.Thread.run(Thread.java:662)
      Caused by: org.apache.flume.ChannelException: Space for commit to queue couldn't be acquired Sinks are likely not keeping up with sources, or the buffer size is too tight
              at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:128)
              at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
              at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
              ... 10 more
      19 Nov 2013 12:32:35,008 INFO  [Log-BackgroundWorker-events] (org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint:214)  - Start checkpoint for /data/flumeng/checkpoint/checkpoint, elements to sync = 16
      

      Am I missing something, or adding some kind of throttling in the loop that consume events from files and let user manage the thoughtput would be a good solution ?

      Following your advices I'am available for submitting a patch.

      Attachments

        Activity

          People

            Unassigned Unassigned
            dstendardi David Stendardi
            Votes:
            1 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - 6h
                6h
                Remaining:
                Remaining Estimate - 6h
                6h
                Logged:
                Time Spent - Not Specified
                Not Specified