diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java index 132abf5..dca90b8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java @@ -19,12 +19,13 @@ package org.apache.hadoop.hbase.io; import java.nio.ByteBuffer; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.BoundedArrayQueue; import com.google.common.annotations.VisibleForTesting; @@ -63,6 +64,8 @@ public class BoundedByteBufferPool { // For reporting private AtomicLong allocations = new AtomicLong(0); + private ReentrantLock lock = new ReentrantLock(); + /** * @param maxByteBufferSizeToCache * @param initialByteBufferSize @@ -72,15 +75,23 @@ public class BoundedByteBufferPool { final int maxToCache) { this.maxByteBufferSizeToCache = maxByteBufferSizeToCache; this.runningAverage = initialByteBufferSize; - this.buffers = new ArrayBlockingQueue(maxToCache, true); + this.buffers = new BoundedArrayQueue(maxToCache); } public ByteBuffer getBuffer() { - ByteBuffer bb = this.buffers.poll(); + ByteBuffer bb = null; + lock.lock(); + try { + bb = this.buffers.poll(); + if (bb != null) { + this.totalReservoirCapacity -= bb.capacity(); + } + } finally { + lock.unlock(); + } if (bb != null) { - // Clear sets limit == capacity. Postion == 0. + // Clear sets limit == capacity. Postion == 0. bb.clear(); - this.totalReservoirCapacity -= bb.capacity(); } else { bb = ByteBuffer.allocate(this.runningAverage); this.allocations.incrementAndGet(); @@ -96,15 +107,21 @@ public class BoundedByteBufferPool { public void putBuffer(ByteBuffer bb) { // If buffer is larger than we want to keep around, just let it go. if (bb.capacity() > this.maxByteBufferSizeToCache) return; - if (!this.buffers.offer(bb)) { + boolean success = false; + int average = 0; + lock.lock(); + try { + success = this.buffers.offer(bb); + if (success) { + this.totalReservoirCapacity += bb.capacity(); + average = this.totalReservoirCapacity / this.buffers.size(); // size will never be 0. + } + } finally { + lock.unlock(); + } + if (!success) { LOG.warn("At capacity: " + this.buffers.size()); } else { - int size = this.buffers.size(); // This size may be inexact. - this.totalReservoirCapacity += bb.capacity(); - int average = 0; - if (size != 0) { - average = this.totalReservoirCapacity / size; - } if (average > this.runningAverage && average < this.maxByteBufferSizeToCache) { this.runningAverage = average; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedArrayQueue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedArrayQueue.java new file mode 100644 index 0000000..5c7f1fa --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedArrayQueue.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.AbstractQueue; +import java.util.Iterator; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A bounded non-thread safe implementation of {@link java.util.Queue}. + */ +@InterfaceAudience.Private +public class BoundedArrayQueue extends AbstractQueue { + + private Object[] items; + private int takeIndex, putIndex; + private int count; + + public BoundedArrayQueue(int maxElements) { + items = new Object[maxElements]; + } + + @Override + public int size() { + return count; + } + + /** + * Not implemented and will throw {@link UnsupportedOperationException} + */ + @Override + public Iterator iterator() { + // We don't need this. Leaving it as not implemented. + throw new UnsupportedOperationException(); + } + + @Override + public boolean offer(E e) { + if (count == items.length) return false; + items[putIndex] = e; + if (++putIndex == items.length) putIndex = 0; + count++; + return true; + } + + @Override + public E poll() { + return (count == 0) ? null : dequeue(); + } + + @SuppressWarnings("unchecked") + private E dequeue() { + E x = (E) items[takeIndex]; + items[takeIndex] = null; + if (++takeIndex == items.length) takeIndex = 0; + count--; + return x; + } + + @SuppressWarnings("unchecked") + @Override + public E peek() { + return (E) items[takeIndex]; + } +}