Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2739

Memory channel does not release the put permits from the bytesRemaining semaphore

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.5.0, 1.6.0
    • None
    • Channel
    • None

    Description

      The memory channel apply the permit when the source put and commit the event into the memory channel,but it does not release the permits from the bytesRemaining semaphore when the source commits successfully,It will not accepet the event for a while.it will occurs the following exceptions:
      015-07-08 07:52:06,089 | WARN | [pool-4-thread-1] | The channel is full, and cannot write data now. The source will try again after 500 milliseconds | org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:276)
      org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel

      {name: mem_channel}

      at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
      at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:273)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.flume.ChannelException: Cannot commit transaction. Byte capacity allocated to store event body 9.69943E8reached. Please increase heap space/byte capacity allocated to the channel as the sinks may not be keeping up with the sources
      at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:131)
      at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
      at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)

      I modify the doCommit method ,and like this ,it does not occur exception:
      @Override
      protected void doCommit() throws InterruptedException {
      int remainingChange = takeList.size() - putList.size();
      if(remainingChange < 0) {
      if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,
      TimeUnit.SECONDS))

      { throw new ChannelException("Cannot commit transaction. Byte capacity " + "allocated to store event body " + byteCapacity * byteCapacitySlotSize + "reached. Please increase heap space/byte capacity allocated to " + "the channel as the sinks may not be keeping up with the sources"); }

      if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS))

      { bytesRemaining.release(putByteCounter); throw new ChannelFullException("Space for commit to queue couldn't be acquired." + " Sinks are likely not keeping up with sources, or the buffer size is too tight"); }

      }
      int puts = putList.size();
      int takes = takeList.size();
      synchronized(queueLock) {
      if(puts > 0 ) {
      while(!putList.isEmpty()) {
      if(!queue.offer(putList.removeFirst()))

      { throw new RuntimeException("Queue add failed, this shouldn't be able to happen"); }

      }
      bytesRemaining.release(putByteCounter);
      }
      putList.clear();
      takeList.clear();
      }
      bytesRemaining.release(takeByteCounter);
      takeByteCounter = 0;
      putByteCounter = 0;

      queueStored.release(puts);
      if(remainingChange > 0)

      { queueRemaining.release(remainingChange); }

      if (puts > 0)

      { channelCounter.addToEventPutSuccessCount(puts); }

      if (takes > 0)

      { channelCounter.addToEventTakeSuccessCount(takes); }

      channelCounter.setChannelSize(queue.size());
      }

      I add the code "bytesRemaining.release(putByteCounter);" after the event take the putList to queue.

      Attachments

        Activity

          People

            Unassigned Unassigned
            yinghua_zh yinghua_zh
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: