From 5e35db370362443555ee85d18e70430191bc7f42 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 15 Oct 2014 09:21:55 -0700 Subject: [PATCH 1/4] Adding ByteBoundedBlockingQueue to utils. --- .../kafka/utils/ByteBoundedBlockingQueue.scala | 175 +++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala new file mode 100644 index 0000000..9c61e68 --- /dev/null +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} + +import scala.xml.Null + +/** + * A blocking queue that have size limits on both number of elements and number of bytes. + */ +class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, sizeFunction: Option[(E) => Int]) + extends Iterable[E] { + private val queue = new LinkedBlockingQueue[E] (queueSize) + private var currentByteSize = new AtomicInteger() + private val putLock = new Object + + /** + * Put an element to the tail of the queue. Wait for certain amount of time if queue is full. + * The size check is only checking if the bytes in the queue reaches threshold or not. That means + * as long as current queue size is under the threshold, the element could be put into the queue, even + * after that the queue byte size will exceed the threshold. Strict queue size check is easy but could potentially + * lead to poor efficiency when the queue is almost full. + * @param e the element to put into the queue + * @param timeout the amount of time to wait before the expire the operation + * @param unit the time unit of timeout parameter, default to millisecond + * @return true if the element is put into queue, false if it is not + * @throws NullPointerException if element is null + */ + def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean = { + if (e == null) throw new NullPointerException("Putting null element into queue.") + val startTime = SystemTime.nanoseconds + val expireTime = startTime + unit.toNanos(timeout) + var success = false + try { + putLock synchronized { + while (currentByteSize.get() >= queueByteSize && SystemTime.nanoseconds < expireTime) + putLock.wait(expireTime - SystemTime.nanoseconds) + // only proceed if queue has capacity and not timeout + if (currentByteSize.get() < queueByteSize && SystemTime.nanoseconds < expireTime) { + success = queue.offer(e, expireTime - SystemTime.nanoseconds, TimeUnit.NANOSECONDS) + // only increase queue byte size iff put succeeds + if (success == true) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteSize) + putLock.notify() + } + } + } catch { + case ie: InterruptedException => + } + success + } + + /** + * Put an element to the tail of the queue, block if queue is full + * @param e The element to put into queue + * @return true on succeed, false on failure + * @throws NullPointerException if element is null + */ + def offer(e: E): Boolean = { + if (e == null) throw new NullPointerException("Putting null element into queue.") + var success = false + try { + putLock synchronized { + while (currentByteSize.get() >= queueByteSize) + putLock.wait() + success = queue.offer(e) + if (success == true) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteSize) + putLock.notify() + } + } catch { + case ie: InterruptedException => + } + success + } + + /** + * Get an element from the head of queue. Wait for some time if the queue is empty. + * The poll method almost does not block on offer. + * @param timeout the amount of time to wait if the queue is empty + * @param unit the unit type + * @return the first element in the queue, null if queue is empty + */ + def poll(timeout: Long, unit: TimeUnit): E = { + var e = queue.poll(timeout, unit) + // only wake up waiting threads if the queue size drop under queueByteSize + if (e != null && + currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && + currentByteSize.get() < queueByteSize) + putLock.notify() + e + } + + /** + * Get an element from the head of the queue, block if the queue is empty + * @return the first element in the queue, null if queue is empty + */ + def poll(): E = { + var e = queue.poll() + // only wake up waiting threads if the queue size drop under queueByteSize + if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && + currentByteSize.get() < queueByteSize) + putLock.notify() + e + } + + /** + * Iterator for the queue + * @return Iterator for the queue + */ + override def iterator() = new Iterator[E] () { + private val iter = queue.iterator() + private var curr: E = null.asInstanceOf[E] + + def hasNext: Boolean = iter.hasNext + + def next: E = { + curr = iter.next() + curr + } + + def remove { + if (curr == null) + throw new IllegalArgumentException + iter.remove() + if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteSize) + putLock.notify() + } + } + + /** + * get the number of elements in the queue + * @return number of elements in the queue + */ + override def size() = queue.size() + + /** + * get the current byte size in the queue + * @return current queue size in bytes + */ + def byteSize() = currentByteSize.get() + + /** + * get the number of unused slots in the queue + * @return the number of unused slots in the queue + */ + def remainingSize = queue.remainingCapacity() + + /** + * get the remaining bytes capacity of the queue + * @return the remaining bytes capacity of the queue + */ + def remainingByteSize = math.max(0, queueByteSize - currentByteSize.get()) +} -- 1.8.3.4 (Apple Git-47) From 5abf366f6890446a689c61996ed31a3fd3f8735e Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 15 Oct 2014 09:25:55 -0700 Subject: [PATCH 2/4] changed arguments name --- .../kafka/utils/ByteBoundedBlockingQueue.scala | 30 +++++++++++----------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala index 9c61e68..f3fed3a 100644 --- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -25,9 +25,9 @@ import scala.xml.Null /** * A blocking queue that have size limits on both number of elements and number of bytes. */ -class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, sizeFunction: Option[(E) => Int]) +class ByteBoundedBlockingQueue[E] (val queueNumMessaegCapacity: Int, val queueByteCapacity: Int, sizeFunction: Option[(E) => Int]) extends Iterable[E] { - private val queue = new LinkedBlockingQueue[E] (queueSize) + private val queue = new LinkedBlockingQueue[E] (queueNumMessaegCapacity) private var currentByteSize = new AtomicInteger() private val putLock = new Object @@ -50,16 +50,16 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s var success = false try { putLock synchronized { - while (currentByteSize.get() >= queueByteSize && SystemTime.nanoseconds < expireTime) + while (currentByteSize.get() >= queueByteCapacity && SystemTime.nanoseconds < expireTime) putLock.wait(expireTime - SystemTime.nanoseconds) // only proceed if queue has capacity and not timeout - if (currentByteSize.get() < queueByteSize && SystemTime.nanoseconds < expireTime) { + if (currentByteSize.get() < queueByteCapacity && SystemTime.nanoseconds < expireTime) { success = queue.offer(e, expireTime - SystemTime.nanoseconds, TimeUnit.NANOSECONDS) // only increase queue byte size iff put succeeds if (success == true) currentByteSize.addAndGet(sizeFunction.get(e)) // wake up another thread in case multiple threads are waiting - if (currentByteSize.get() < queueByteSize) + if (currentByteSize.get() < queueByteCapacity) putLock.notify() } } @@ -80,13 +80,13 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s var success = false try { putLock synchronized { - while (currentByteSize.get() >= queueByteSize) + while (currentByteSize.get() >= queueByteCapacity) putLock.wait() success = queue.offer(e) if (success == true) currentByteSize.addAndGet(sizeFunction.get(e)) // wake up another thread in case multiple threads are waiting - if (currentByteSize.get() < queueByteSize) + if (currentByteSize.get() < queueByteCapacity) putLock.notify() } } catch { @@ -104,10 +104,10 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s */ def poll(timeout: Long, unit: TimeUnit): E = { var e = queue.poll(timeout, unit) - // only wake up waiting threads if the queue size drop under queueByteSize + // only wake up waiting threads if the queue size drop under queueByteCapacity if (e != null && - currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && - currentByteSize.get() < queueByteSize) + currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity && + currentByteSize.get() < queueByteCapacity) putLock.notify() e } @@ -118,9 +118,9 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s */ def poll(): E = { var e = queue.poll() - // only wake up waiting threads if the queue size drop under queueByteSize - if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && - currentByteSize.get() < queueByteSize) + // only wake up waiting threads if the queue size drop under queueByteCapacity + if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity && + currentByteSize.get() < queueByteCapacity) putLock.notify() e } @@ -144,7 +144,7 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s if (curr == null) throw new IllegalArgumentException iter.remove() - if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteSize) + if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteCapacity) putLock.notify() } } @@ -171,5 +171,5 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s * get the remaining bytes capacity of the queue * @return the remaining bytes capacity of the queue */ - def remainingByteSize = math.max(0, queueByteSize - currentByteSize.get()) + def remainingByteSize = math.max(0, queueByteCapacity - currentByteSize.get()) } -- 1.8.3.4 (Apple Git-47) From 8fff3b9a6c4c8f564481e6231453eef84163e6fb Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 15 Oct 2014 09:27:29 -0700 Subject: [PATCH 3/4] correct typo. --- core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala index f3fed3a..39f34a7 100644 --- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -25,9 +25,9 @@ import scala.xml.Null /** * A blocking queue that have size limits on both number of elements and number of bytes. */ -class ByteBoundedBlockingQueue[E] (val queueNumMessaegCapacity: Int, val queueByteCapacity: Int, sizeFunction: Option[(E) => Int]) +class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueByteCapacity: Int, sizeFunction: Option[(E) => Int]) extends Iterable[E] { - private val queue = new LinkedBlockingQueue[E] (queueNumMessaegCapacity) + private val queue = new LinkedBlockingQueue[E] (queueNumMessageCapacity) private var currentByteSize = new AtomicInteger() private val putLock = new Object -- 1.8.3.4 (Apple Git-47) From 99a990e0f7aeaa362ef58dacb925c1d024cccecd Mon Sep 17 00:00:00 2001 From: jqin Date: Sun, 26 Oct 2014 23:42:02 -0700 Subject: [PATCH 4/4] Incorporated Joel's comments. Also fixed negative queue size problem. --- .../kafka/utils/ByteBoundedBlockingQueue.scala | 136 +++++++++++++-------- 1 file changed, 88 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala index 39f34a7..6732cdc 100644 --- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -32,96 +32,130 @@ class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueBy private val putLock = new Object /** - * Put an element to the tail of the queue. Wait for certain amount of time if queue is full. - * The size check is only checking if the bytes in the queue reaches threshold or not. That means - * as long as current queue size is under the threshold, the element could be put into the queue, even - * after that the queue byte size will exceed the threshold. Strict queue size check is easy but could potentially - * lead to poor efficiency when the queue is almost full. + * Please refer to [[java.util.concurrent.BlockingQueue#offer]] + * An element can be enqueued provided the current size (in number of elements) is within the configured + * capacity and the current size in bytes of the queue is within the configured byte capacity. i.e., the + * element may be enqueued even if adding it causes the queue's size in bytes to exceed the byte capacity. * @param e the element to put into the queue * @param timeout the amount of time to wait before the expire the operation * @param unit the time unit of timeout parameter, default to millisecond * @return true if the element is put into queue, false if it is not * @throws NullPointerException if element is null + * @throws InterruptedException if interrupted during waiting */ def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean = { if (e == null) throw new NullPointerException("Putting null element into queue.") val startTime = SystemTime.nanoseconds val expireTime = startTime + unit.toNanos(timeout) - var success = false - try { - putLock synchronized { - while (currentByteSize.get() >= queueByteCapacity && SystemTime.nanoseconds < expireTime) - putLock.wait(expireTime - SystemTime.nanoseconds) - // only proceed if queue has capacity and not timeout - if (currentByteSize.get() < queueByteCapacity && SystemTime.nanoseconds < expireTime) { - success = queue.offer(e, expireTime - SystemTime.nanoseconds, TimeUnit.NANOSECONDS) - // only increase queue byte size iff put succeeds - if (success == true) - currentByteSize.addAndGet(sizeFunction.get(e)) - // wake up another thread in case multiple threads are waiting - if (currentByteSize.get() < queueByteCapacity) - putLock.notify() - } + putLock synchronized { + while (currentByteSize.get() >= queueByteCapacity && SystemTime.nanoseconds < expireTime) + putLock.wait(expireTime - SystemTime.nanoseconds) + // only proceed if queue has capacity and not timeout + if (currentByteSize.get() < queueByteCapacity && SystemTime.nanoseconds < expireTime) { + val success = queue.offer(e, expireTime - SystemTime.nanoseconds, TimeUnit.NANOSECONDS) + // only increase queue byte size iff put succeeds + if (success) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteCapacity) + putLock.notify() + return success } - } catch { - case ie: InterruptedException => } - success + false } /** - * Put an element to the tail of the queue, block if queue is full + * Please refer to [[java.util.concurrent.BlockingQueue#offer]]. + * Put an element to the tail of the queue, return false immediately if queue is full * @param e The element to put into queue * @return true on succeed, false on failure * @throws NullPointerException if element is null + * @throws InterruptedException if interrupted during waiting */ def offer(e: E): Boolean = { if (e == null) throw new NullPointerException("Putting null element into queue.") - var success = false - try { - putLock synchronized { - while (currentByteSize.get() >= queueByteCapacity) - putLock.wait() - success = queue.offer(e) - if (success == true) - currentByteSize.addAndGet(sizeFunction.get(e)) - // wake up another thread in case multiple threads are waiting - if (currentByteSize.get() < queueByteCapacity) - putLock.notify() - } - } catch { - case ie: InterruptedException => + putLock synchronized { + if (currentByteSize.get() >= queueByteCapacity) + return false + val success = queue.offer(e) + if (success) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteCapacity) + putLock.notify() + return success + } + false + } + + /** + * Please refer to [[java.util.concurrent.BlockingQueue#put]]. + * Put an element to the tail of the queue, block if queue is full + * @param e The element to put into queue + * @return true on succeed, false on failure + * @throws NullPointerException if element is null + * @throws InterruptedException if interrupted during waiting + */ + def put(e: E): Boolean = { + if (e == null) throw new NullPointerException("Putting null element into queue.") + putLock synchronized { + if (currentByteSize.get() >= queueByteCapacity) + putLock.wait() + val success = queue.offer(e) + if (success) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteCapacity) + putLock.notify() + return success } - success + false } /** + * Please refer to [[java.util.concurrent.BlockingQueue#poll]] * Get an element from the head of queue. Wait for some time if the queue is empty. - * The poll method almost does not block on offer. * @param timeout the amount of time to wait if the queue is empty * @param unit the unit type * @return the first element in the queue, null if queue is empty */ def poll(timeout: Long, unit: TimeUnit): E = { - var e = queue.poll(timeout, unit) + val e = queue.poll(timeout, unit) // only wake up waiting threads if the queue size drop under queueByteCapacity if (e != null && currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity && currentByteSize.get() < queueByteCapacity) - putLock.notify() + putLock.synchronized(putLock.notify()) e } /** - * Get an element from the head of the queue, block if the queue is empty + * Please refer to [[java.util.concurrent.BlockingQueue#poll]] + * Get an element from the head of queue. * @return the first element in the queue, null if queue is empty */ def poll(): E = { - var e = queue.poll() + val e = queue.poll() + // only wake up waiting threads if the queue size drop under queueByteCapacity + if (e != null && + currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity && + currentByteSize.get() < queueByteCapacity) + putLock.synchronized(putLock.notify()) + e + } + + /** + * Please refer to [[java.util.concurrent.BlockingQueue#take]] + * Get an element from the head of the queue, block if the queue is empty + * @return the first element in the queue, null if queue is empty + */ + def take(): E = { + val e = queue.take() // only wake up waiting threads if the queue size drop under queueByteCapacity if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity && currentByteSize.get() < queueByteCapacity) - putLock.notify() + putLock.synchronized(putLock.notify()) e } @@ -142,10 +176,10 @@ class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueBy def remove { if (curr == null) - throw new IllegalArgumentException + throw new IllegalStateException("Iterator does not have a current element.") iter.remove() if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteCapacity) - putLock.notify() + putLock.synchronized(putLock.notify()) } } @@ -159,7 +193,13 @@ class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueBy * get the current byte size in the queue * @return current queue size in bytes */ - def byteSize() = currentByteSize.get() + def byteSize() = { + val currSize = currentByteSize.get() + // There is a potential race where after an element is put into the queue and before the size is added to + // currentByteSize, it was taken out of the queue and the size was deducted from the currentByteSize, + // in that case, currentByteSize would become negative, in that case, just put the queue size to be 0. + if (currSize > 0) currSize else 0 + } /** * get the number of unused slots in the queue -- 1.8.3.4 (Apple Git-47)