Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.5.0, 1.6.0
-
None
-
None
Description
The memory channel apply the permit when the source put and commit the event into the memory channel,but it does not release the permits from the bytesRemaining semaphore when the source commits successfully,It will not accepet the event for a while.it will occurs the following exceptions:
015-07-08 07:52:06,089 | WARN | [pool-4-thread-1] | The channel is full, and cannot write data now. The source will try again after 500 milliseconds | org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:276)
org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:273)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Cannot commit transaction. Byte capacity allocated to store event body 9.69943E8reached. Please increase heap space/byte capacity allocated to the channel as the sinks may not be keeping up with the sources
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:131)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
I modify the doCommit method ,and like this ,it does not occur exception:
@Override
protected void doCommit() throws InterruptedException {
int remainingChange = takeList.size() - putList.size();
if(remainingChange < 0) {
if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,
TimeUnit.SECONDS))
if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS))
{ bytesRemaining.release(putByteCounter); throw new ChannelFullException("Space for commit to queue couldn't be acquired." + " Sinks are likely not keeping up with sources, or the buffer size is too tight"); } }
int puts = putList.size();
int takes = takeList.size();
synchronized(queueLock) {
if(puts > 0 ) {
while(!putList.isEmpty()) {
if(!queue.offer(putList.removeFirst()))
}
bytesRemaining.release(putByteCounter);
}
putList.clear();
takeList.clear();
}
bytesRemaining.release(takeByteCounter);
takeByteCounter = 0;
putByteCounter = 0;
queueStored.release(puts);
if(remainingChange > 0)
if (puts > 0)
{ channelCounter.addToEventPutSuccessCount(puts); }if (takes > 0)
{ channelCounter.addToEventTakeSuccessCount(takes); } channelCounter.setChannelSize(queue.size());
}
I add the code "bytesRemaining.release(putByteCounter);" after the event take the putList to queue.