From 01500b25fbf34db5827553f273d463065b5f4503 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 15 Oct 2014 09:21:55 -0700 Subject: [PATCH 1/6] Adding ByteBoundedBlockingQueue to utils. --- .../kafka/utils/ByteBoundedBlockingQueue.scala | 175 +++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala new file mode 100644 index 0000000..9c61e68 --- /dev/null +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} + +import scala.xml.Null + +/** + * A blocking queue that have size limits on both number of elements and number of bytes. + */ +class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, sizeFunction: Option[(E) => Int]) + extends Iterable[E] { + private val queue = new LinkedBlockingQueue[E] (queueSize) + private var currentByteSize = new AtomicInteger() + private val putLock = new Object + + /** + * Put an element to the tail of the queue. Wait for certain amount of time if queue is full. + * The size check is only checking if the bytes in the queue reaches threshold or not. That means + * as long as current queue size is under the threshold, the element could be put into the queue, even + * after that the queue byte size will exceed the threshold. Strict queue size check is easy but could potentially + * lead to poor efficiency when the queue is almost full. + * @param e the element to put into the queue + * @param timeout the amount of time to wait before the expire the operation + * @param unit the time unit of timeout parameter, default to millisecond + * @return true if the element is put into queue, false if it is not + * @throws NullPointerException if element is null + */ + def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean = { + if (e == null) throw new NullPointerException("Putting null element into queue.") + val startTime = SystemTime.nanoseconds + val expireTime = startTime + unit.toNanos(timeout) + var success = false + try { + putLock synchronized { + while (currentByteSize.get() >= queueByteSize && SystemTime.nanoseconds < expireTime) + putLock.wait(expireTime - SystemTime.nanoseconds) + // only proceed if queue has capacity and not timeout + if (currentByteSize.get() < queueByteSize && SystemTime.nanoseconds < expireTime) { + success = queue.offer(e, expireTime - SystemTime.nanoseconds, TimeUnit.NANOSECONDS) + // only increase queue byte size iff put succeeds + if (success == true) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteSize) + putLock.notify() + } + } + } catch { + case ie: InterruptedException => + } + success + } + + /** + * Put an element to the tail of the queue, block if queue is full + * @param e The element to put into queue + * @return true on succeed, false on failure + * @throws NullPointerException if element is null + */ + def offer(e: E): Boolean = { + if (e == null) throw new NullPointerException("Putting null element into queue.") + var success = false + try { + putLock synchronized { + while (currentByteSize.get() >= queueByteSize) + putLock.wait() + success = queue.offer(e) + if (success == true) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteSize) + putLock.notify() + } + } catch { + case ie: InterruptedException => + } + success + } + + /** + * Get an element from the head of queue. Wait for some time if the queue is empty. + * The poll method almost does not block on offer. + * @param timeout the amount of time to wait if the queue is empty + * @param unit the unit type + * @return the first element in the queue, null if queue is empty + */ + def poll(timeout: Long, unit: TimeUnit): E = { + var e = queue.poll(timeout, unit) + // only wake up waiting threads if the queue size drop under queueByteSize + if (e != null && + currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && + currentByteSize.get() < queueByteSize) + putLock.notify() + e + } + + /** + * Get an element from the head of the queue, block if the queue is empty + * @return the first element in the queue, null if queue is empty + */ + def poll(): E = { + var e = queue.poll() + // only wake up waiting threads if the queue size drop under queueByteSize + if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && + currentByteSize.get() < queueByteSize) + putLock.notify() + e + } + + /** + * Iterator for the queue + * @return Iterator for the queue + */ + override def iterator() = new Iterator[E] () { + private val iter = queue.iterator() + private var curr: E = null.asInstanceOf[E] + + def hasNext: Boolean = iter.hasNext + + def next: E = { + curr = iter.next() + curr + } + + def remove { + if (curr == null) + throw new IllegalArgumentException + iter.remove() + if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteSize) + putLock.notify() + } + } + + /** + * get the number of elements in the queue + * @return number of elements in the queue + */ + override def size() = queue.size() + + /** + * get the current byte size in the queue + * @return current queue size in bytes + */ + def byteSize() = currentByteSize.get() + + /** + * get the number of unused slots in the queue + * @return the number of unused slots in the queue + */ + def remainingSize = queue.remainingCapacity() + + /** + * get the remaining bytes capacity of the queue + * @return the remaining bytes capacity of the queue + */ + def remainingByteSize = math.max(0, queueByteSize - currentByteSize.get()) +} -- 1.8.3.4 (Apple Git-47) From d9af3f6c66ee96564afc27c2f2a840e519156236 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 15 Oct 2014 09:25:55 -0700 Subject: [PATCH 2/6] changed arguments name --- .../kafka/utils/ByteBoundedBlockingQueue.scala | 30 +++++++++++----------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala index 9c61e68..f3fed3a 100644 --- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -25,9 +25,9 @@ import scala.xml.Null /** * A blocking queue that have size limits on both number of elements and number of bytes. */ -class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, sizeFunction: Option[(E) => Int]) +class ByteBoundedBlockingQueue[E] (val queueNumMessaegCapacity: Int, val queueByteCapacity: Int, sizeFunction: Option[(E) => Int]) extends Iterable[E] { - private val queue = new LinkedBlockingQueue[E] (queueSize) + private val queue = new LinkedBlockingQueue[E] (queueNumMessaegCapacity) private var currentByteSize = new AtomicInteger() private val putLock = new Object @@ -50,16 +50,16 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s var success = false try { putLock synchronized { - while (currentByteSize.get() >= queueByteSize && SystemTime.nanoseconds < expireTime) + while (currentByteSize.get() >= queueByteCapacity && SystemTime.nanoseconds < expireTime) putLock.wait(expireTime - SystemTime.nanoseconds) // only proceed if queue has capacity and not timeout - if (currentByteSize.get() < queueByteSize && SystemTime.nanoseconds < expireTime) { + if (currentByteSize.get() < queueByteCapacity && SystemTime.nanoseconds < expireTime) { success = queue.offer(e, expireTime - SystemTime.nanoseconds, TimeUnit.NANOSECONDS) // only increase queue byte size iff put succeeds if (success == true) currentByteSize.addAndGet(sizeFunction.get(e)) // wake up another thread in case multiple threads are waiting - if (currentByteSize.get() < queueByteSize) + if (currentByteSize.get() < queueByteCapacity) putLock.notify() } } @@ -80,13 +80,13 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s var success = false try { putLock synchronized { - while (currentByteSize.get() >= queueByteSize) + while (currentByteSize.get() >= queueByteCapacity) putLock.wait() success = queue.offer(e) if (success == true) currentByteSize.addAndGet(sizeFunction.get(e)) // wake up another thread in case multiple threads are waiting - if (currentByteSize.get() < queueByteSize) + if (currentByteSize.get() < queueByteCapacity) putLock.notify() } } catch { @@ -104,10 +104,10 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s */ def poll(timeout: Long, unit: TimeUnit): E = { var e = queue.poll(timeout, unit) - // only wake up waiting threads if the queue size drop under queueByteSize + // only wake up waiting threads if the queue size drop under queueByteCapacity if (e != null && - currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && - currentByteSize.get() < queueByteSize) + currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity && + currentByteSize.get() < queueByteCapacity) putLock.notify() e } @@ -118,9 +118,9 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s */ def poll(): E = { var e = queue.poll() - // only wake up waiting threads if the queue size drop under queueByteSize - if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && - currentByteSize.get() < queueByteSize) + // only wake up waiting threads if the queue size drop under queueByteCapacity + if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity && + currentByteSize.get() < queueByteCapacity) putLock.notify() e } @@ -144,7 +144,7 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s if (curr == null) throw new IllegalArgumentException iter.remove() - if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteSize) + if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteCapacity) putLock.notify() } } @@ -171,5 +171,5 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s * get the remaining bytes capacity of the queue * @return the remaining bytes capacity of the queue */ - def remainingByteSize = math.max(0, queueByteSize - currentByteSize.get()) + def remainingByteSize = math.max(0, queueByteCapacity - currentByteSize.get()) } -- 1.8.3.4 (Apple Git-47) From b65b2b6cea643283c1f3979c5605f992f4aaa605 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 15 Oct 2014 09:27:29 -0700 Subject: [PATCH 3/6] correct typo. --- core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala index f3fed3a..39f34a7 100644 --- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -25,9 +25,9 @@ import scala.xml.Null /** * A blocking queue that have size limits on both number of elements and number of bytes. */ -class ByteBoundedBlockingQueue[E] (val queueNumMessaegCapacity: Int, val queueByteCapacity: Int, sizeFunction: Option[(E) => Int]) +class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueByteCapacity: Int, sizeFunction: Option[(E) => Int]) extends Iterable[E] { - private val queue = new LinkedBlockingQueue[E] (queueNumMessaegCapacity) + private val queue = new LinkedBlockingQueue[E] (queueNumMessageCapacity) private var currentByteSize = new AtomicInteger() private val putLock = new Object -- 1.8.3.4 (Apple Git-47) From 98294e3809bba90cc9a59cb19ab9814f338a6f22 Mon Sep 17 00:00:00 2001 From: jqin Date: Sun, 26 Oct 2014 23:42:02 -0700 Subject: [PATCH 4/6] Incorporated Joel's comments. Also fixed negative queue size problem. --- .../kafka/utils/ByteBoundedBlockingQueue.scala | 136 +++++++++++++-------- 1 file changed, 88 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala index 39f34a7..6732cdc 100644 --- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -32,96 +32,130 @@ class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueBy private val putLock = new Object /** - * Put an element to the tail of the queue. Wait for certain amount of time if queue is full. - * The size check is only checking if the bytes in the queue reaches threshold or not. That means - * as long as current queue size is under the threshold, the element could be put into the queue, even - * after that the queue byte size will exceed the threshold. Strict queue size check is easy but could potentially - * lead to poor efficiency when the queue is almost full. + * Please refer to [[java.util.concurrent.BlockingQueue#offer]] + * An element can be enqueued provided the current size (in number of elements) is within the configured + * capacity and the current size in bytes of the queue is within the configured byte capacity. i.e., the + * element may be enqueued even if adding it causes the queue's size in bytes to exceed the byte capacity. * @param e the element to put into the queue * @param timeout the amount of time to wait before the expire the operation * @param unit the time unit of timeout parameter, default to millisecond * @return true if the element is put into queue, false if it is not * @throws NullPointerException if element is null + * @throws InterruptedException if interrupted during waiting */ def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean = { if (e == null) throw new NullPointerException("Putting null element into queue.") val startTime = SystemTime.nanoseconds val expireTime = startTime + unit.toNanos(timeout) - var success = false - try { - putLock synchronized { - while (currentByteSize.get() >= queueByteCapacity && SystemTime.nanoseconds < expireTime) - putLock.wait(expireTime - SystemTime.nanoseconds) - // only proceed if queue has capacity and not timeout - if (currentByteSize.get() < queueByteCapacity && SystemTime.nanoseconds < expireTime) { - success = queue.offer(e, expireTime - SystemTime.nanoseconds, TimeUnit.NANOSECONDS) - // only increase queue byte size iff put succeeds - if (success == true) - currentByteSize.addAndGet(sizeFunction.get(e)) - // wake up another thread in case multiple threads are waiting - if (currentByteSize.get() < queueByteCapacity) - putLock.notify() - } + putLock synchronized { + while (currentByteSize.get() >= queueByteCapacity && SystemTime.nanoseconds < expireTime) + putLock.wait(expireTime - SystemTime.nanoseconds) + // only proceed if queue has capacity and not timeout + if (currentByteSize.get() < queueByteCapacity && SystemTime.nanoseconds < expireTime) { + val success = queue.offer(e, expireTime - SystemTime.nanoseconds, TimeUnit.NANOSECONDS) + // only increase queue byte size iff put succeeds + if (success) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteCapacity) + putLock.notify() + return success } - } catch { - case ie: InterruptedException => } - success + false } /** - * Put an element to the tail of the queue, block if queue is full + * Please refer to [[java.util.concurrent.BlockingQueue#offer]]. + * Put an element to the tail of the queue, return false immediately if queue is full * @param e The element to put into queue * @return true on succeed, false on failure * @throws NullPointerException if element is null + * @throws InterruptedException if interrupted during waiting */ def offer(e: E): Boolean = { if (e == null) throw new NullPointerException("Putting null element into queue.") - var success = false - try { - putLock synchronized { - while (currentByteSize.get() >= queueByteCapacity) - putLock.wait() - success = queue.offer(e) - if (success == true) - currentByteSize.addAndGet(sizeFunction.get(e)) - // wake up another thread in case multiple threads are waiting - if (currentByteSize.get() < queueByteCapacity) - putLock.notify() - } - } catch { - case ie: InterruptedException => + putLock synchronized { + if (currentByteSize.get() >= queueByteCapacity) + return false + val success = queue.offer(e) + if (success) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteCapacity) + putLock.notify() + return success + } + false + } + + /** + * Please refer to [[java.util.concurrent.BlockingQueue#put]]. + * Put an element to the tail of the queue, block if queue is full + * @param e The element to put into queue + * @return true on succeed, false on failure + * @throws NullPointerException if element is null + * @throws InterruptedException if interrupted during waiting + */ + def put(e: E): Boolean = { + if (e == null) throw new NullPointerException("Putting null element into queue.") + putLock synchronized { + if (currentByteSize.get() >= queueByteCapacity) + putLock.wait() + val success = queue.offer(e) + if (success) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteCapacity) + putLock.notify() + return success } - success + false } /** + * Please refer to [[java.util.concurrent.BlockingQueue#poll]] * Get an element from the head of queue. Wait for some time if the queue is empty. - * The poll method almost does not block on offer. * @param timeout the amount of time to wait if the queue is empty * @param unit the unit type * @return the first element in the queue, null if queue is empty */ def poll(timeout: Long, unit: TimeUnit): E = { - var e = queue.poll(timeout, unit) + val e = queue.poll(timeout, unit) // only wake up waiting threads if the queue size drop under queueByteCapacity if (e != null && currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity && currentByteSize.get() < queueByteCapacity) - putLock.notify() + putLock.synchronized(putLock.notify()) e } /** - * Get an element from the head of the queue, block if the queue is empty + * Please refer to [[java.util.concurrent.BlockingQueue#poll]] + * Get an element from the head of queue. * @return the first element in the queue, null if queue is empty */ def poll(): E = { - var e = queue.poll() + val e = queue.poll() + // only wake up waiting threads if the queue size drop under queueByteCapacity + if (e != null && + currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity && + currentByteSize.get() < queueByteCapacity) + putLock.synchronized(putLock.notify()) + e + } + + /** + * Please refer to [[java.util.concurrent.BlockingQueue#take]] + * Get an element from the head of the queue, block if the queue is empty + * @return the first element in the queue, null if queue is empty + */ + def take(): E = { + val e = queue.take() // only wake up waiting threads if the queue size drop under queueByteCapacity if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity && currentByteSize.get() < queueByteCapacity) - putLock.notify() + putLock.synchronized(putLock.notify()) e } @@ -142,10 +176,10 @@ class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueBy def remove { if (curr == null) - throw new IllegalArgumentException + throw new IllegalStateException("Iterator does not have a current element.") iter.remove() if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteCapacity) - putLock.notify() + putLock.synchronized(putLock.notify()) } } @@ -159,7 +193,13 @@ class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueBy * get the current byte size in the queue * @return current queue size in bytes */ - def byteSize() = currentByteSize.get() + def byteSize() = { + val currSize = currentByteSize.get() + // There is a potential race where after an element is put into the queue and before the size is added to + // currentByteSize, it was taken out of the queue and the size was deducted from the currentByteSize, + // in that case, currentByteSize would become negative, in that case, just put the queue size to be 0. + if (currSize > 0) currSize else 0 + } /** * get the number of unused slots in the queue -- 1.8.3.4 (Apple Git-47) From bb626064c954650ea03f611d7240e40d13326d58 Mon Sep 17 00:00:00 2001 From: jqin Date: Mon, 27 Oct 2014 18:33:39 -0700 Subject: [PATCH 5/6] Incorporated Joel's comments. --- .../kafka/utils/ByteBoundedBlockingQueue.scala | 33 +++++++++++----------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala index 6732cdc..71ad45b 100644 --- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -20,8 +20,6 @@ package kafka.utils import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} -import scala.xml.Null - /** * A blocking queue that have size limits on both number of elements and number of bytes. */ @@ -53,16 +51,17 @@ class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueBy // only proceed if queue has capacity and not timeout if (currentByteSize.get() < queueByteCapacity && SystemTime.nanoseconds < expireTime) { val success = queue.offer(e, expireTime - SystemTime.nanoseconds, TimeUnit.NANOSECONDS) - // only increase queue byte size iff put succeeds + // only increase queue byte size if put succeeds if (success) currentByteSize.addAndGet(sizeFunction.get(e)) // wake up another thread in case multiple threads are waiting if (currentByteSize.get() < queueByteCapacity) putLock.notify() - return success + success + } else { + false } } - false } /** @@ -76,17 +75,18 @@ class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueBy def offer(e: E): Boolean = { if (e == null) throw new NullPointerException("Putting null element into queue.") putLock synchronized { - if (currentByteSize.get() >= queueByteCapacity) - return false - val success = queue.offer(e) - if (success) - currentByteSize.addAndGet(sizeFunction.get(e)) - // wake up another thread in case multiple threads are waiting - if (currentByteSize.get() < queueByteCapacity) - putLock.notify() - return success + if (currentByteSize.get() >= queueByteCapacity) { + false + } else { + val success = queue.offer(e) + if (success) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteCapacity) + putLock.notify() + success + } } - false } /** @@ -108,9 +108,8 @@ class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueBy // wake up another thread in case multiple threads are waiting if (currentByteSize.get() < queueByteCapacity) putLock.notify() - return success + success } - false } /** -- 1.8.3.4 (Apple Git-47) From f32e953e742740eba1955b4093873c95d6c1c891 Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 29 Oct 2014 10:57:15 -0700 Subject: [PATCH 6/6] Added unit test for ByteBoundedBlockingQueue. Fixed a bug regarding wating time. --- .../kafka/utils/ByteBoundedBlockingQueue.scala | 13 ++- .../kafka/utils/ByteBoundedBlockingQueueTest.scala | 103 +++++++++++++++++++++ 2 files changed, 112 insertions(+), 4 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala index 71ad45b..cade48d 100644 --- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -45,12 +45,17 @@ class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueBy if (e == null) throw new NullPointerException("Putting null element into queue.") val startTime = SystemTime.nanoseconds val expireTime = startTime + unit.toNanos(timeout) + var timeoutNanos = expireTime - startTime putLock synchronized { - while (currentByteSize.get() >= queueByteCapacity && SystemTime.nanoseconds < expireTime) - putLock.wait(expireTime - SystemTime.nanoseconds) + timeoutNanos = expireTime - SystemTime.nanoseconds + while (currentByteSize.get() >= queueByteCapacity && timeoutNanos > 0) { + putLock.wait(timeoutNanos / 1000000, (timeoutNanos % 1000000).toInt) + timeoutNanos = expireTime - SystemTime.nanoseconds + } // only proceed if queue has capacity and not timeout - if (currentByteSize.get() < queueByteCapacity && SystemTime.nanoseconds < expireTime) { - val success = queue.offer(e, expireTime - SystemTime.nanoseconds, TimeUnit.NANOSECONDS) + timeoutNanos = expireTime - SystemTime.nanoseconds + if (currentByteSize.get() < queueByteCapacity && timeoutNanos > 0) { + val success = queue.offer(e, timeoutNanos, TimeUnit.NANOSECONDS) // only increase queue byte size if put succeeds if (success) currentByteSize.addAndGet(sizeFunction.get(e)) diff --git a/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala b/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala new file mode 100644 index 0000000..c9fccca --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.utils + +import java.util.concurrent.TimeUnit + +import junit.framework.Assert._ +import org.junit.{Test} +import kafka.utils.ByteBoundedBlockingQueue + +class ByteBoundedBlockingQueueTest { + val sizeFunction = (a: String) => a.length + val queue = new ByteBoundedBlockingQueue[String](5, 15, Some(sizeFunction)) + + @Test + def testByteBoundedBlockingQueue() { + assertEquals(0, queue.size()) + assertEquals(0, queue.byteSize()) + assertEquals(5, queue.queueNumMessageCapacity) + assertEquals(15, queue.queueByteCapacity) + assertEquals(5, queue.remainingSize) + assertEquals(15, queue.remainingByteSize) + + //offer a message whose size is smaller than remaining capacity + val m0 = new String("0123456789") + assertEquals(true, queue.offer(m0)) + assertEquals(1, queue.size()) + assertEquals(10, queue.byteSize()) + assertEquals(4, queue.remainingSize) + assertEquals(5, queue.remainingByteSize) + + // offer a message where remaining capacity < message size < capacity limit + val m1 = new String("1234567890") + assertEquals(true, queue.offer(m1)) + assertEquals(2, queue.size()) + assertEquals(20, queue.byteSize()) + assertEquals(3, queue.remainingSize) + assertEquals(0, queue.remainingByteSize) + + // offer a message using timeout, should fail because no space is left + val m2 = new String("2345678901") + assertEquals(false, queue.offer(m2, 10, TimeUnit.MILLISECONDS)) + assertEquals(2, queue.size()) + assertEquals(20, queue.byteSize()) + assertEquals(3, queue.remainingSize) + assertEquals(0, queue.remainingByteSize) + + // take an element out of the queue + assertEquals("0123456789", queue.take()) + assertEquals(1, queue.size()) + assertEquals(10, queue.byteSize()) + assertEquals(4, queue.remainingSize) + assertEquals(5, queue.remainingByteSize) + + // add 5 small elements into the queue, first 4 should succeed, the 5th one should fail + // test put() + assertEquals(true, queue.put("a")) + assertEquals(true, queue.offer("b")) + assertEquals(true, queue.offer("c")) + assertEquals(4, queue.size()) + assertEquals(13, queue.byteSize()) + assertEquals(1, queue.remainingSize) + assertEquals(2, queue.remainingByteSize) + + assertEquals(true, queue.offer("d")) + assertEquals(5, queue.size()) + assertEquals(14, queue.byteSize()) + assertEquals(0, queue.remainingSize) + assertEquals(1, queue.remainingByteSize) + + assertEquals(false, queue.offer("e")) + assertEquals(5, queue.size()) + assertEquals(14, queue.byteSize()) + assertEquals(0, queue.remainingSize) + assertEquals(1, queue.remainingByteSize) + + // try take 6 elements out of the queue, the last poll() should fail as there is no element anymore + // test take() + assertEquals("1234567890", queue.poll(10, TimeUnit.MILLISECONDS)) + // test poll + assertEquals("a", queue.poll()) + assertEquals("b", queue.poll()) + assertEquals("c", queue.poll()) + assertEquals("d", queue.poll()) + assertEquals(null, queue.poll(10, TimeUnit.MILLISECONDS)) + } + +} -- 1.8.3.4 (Apple Git-47)