From 5e35db370362443555ee85d18e70430191bc7f42 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 15 Oct 2014 09:21:55 -0700 Subject: [PATCH 1/3] Adding ByteBoundedBlockingQueue to utils. --- .../kafka/utils/ByteBoundedBlockingQueue.scala | 175 +++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala new file mode 100644 index 0000000..9c61e68 --- /dev/null +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} + +import scala.xml.Null + +/** + * A blocking queue that have size limits on both number of elements and number of bytes. + */ +class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, sizeFunction: Option[(E) => Int]) + extends Iterable[E] { + private val queue = new LinkedBlockingQueue[E] (queueSize) + private var currentByteSize = new AtomicInteger() + private val putLock = new Object + + /** + * Put an element to the tail of the queue. Wait for certain amount of time if queue is full. + * The size check is only checking if the bytes in the queue reaches threshold or not. That means + * as long as current queue size is under the threshold, the element could be put into the queue, even + * after that the queue byte size will exceed the threshold. Strict queue size check is easy but could potentially + * lead to poor efficiency when the queue is almost full. + * @param e the element to put into the queue + * @param timeout the amount of time to wait before the expire the operation + * @param unit the time unit of timeout parameter, default to millisecond + * @return true if the element is put into queue, false if it is not + * @throws NullPointerException if element is null + */ + def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean = { + if (e == null) throw new NullPointerException("Putting null element into queue.") + val startTime = SystemTime.nanoseconds + val expireTime = startTime + unit.toNanos(timeout) + var success = false + try { + putLock synchronized { + while (currentByteSize.get() >= queueByteSize && SystemTime.nanoseconds < expireTime) + putLock.wait(expireTime - SystemTime.nanoseconds) + // only proceed if queue has capacity and not timeout + if (currentByteSize.get() < queueByteSize && SystemTime.nanoseconds < expireTime) { + success = queue.offer(e, expireTime - SystemTime.nanoseconds, TimeUnit.NANOSECONDS) + // only increase queue byte size iff put succeeds + if (success == true) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteSize) + putLock.notify() + } + } + } catch { + case ie: InterruptedException => + } + success + } + + /** + * Put an element to the tail of the queue, block if queue is full + * @param e The element to put into queue + * @return true on succeed, false on failure + * @throws NullPointerException if element is null + */ + def offer(e: E): Boolean = { + if (e == null) throw new NullPointerException("Putting null element into queue.") + var success = false + try { + putLock synchronized { + while (currentByteSize.get() >= queueByteSize) + putLock.wait() + success = queue.offer(e) + if (success == true) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteSize) + putLock.notify() + } + } catch { + case ie: InterruptedException => + } + success + } + + /** + * Get an element from the head of queue. Wait for some time if the queue is empty. + * The poll method almost does not block on offer. + * @param timeout the amount of time to wait if the queue is empty + * @param unit the unit type + * @return the first element in the queue, null if queue is empty + */ + def poll(timeout: Long, unit: TimeUnit): E = { + var e = queue.poll(timeout, unit) + // only wake up waiting threads if the queue size drop under queueByteSize + if (e != null && + currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && + currentByteSize.get() < queueByteSize) + putLock.notify() + e + } + + /** + * Get an element from the head of the queue, block if the queue is empty + * @return the first element in the queue, null if queue is empty + */ + def poll(): E = { + var e = queue.poll() + // only wake up waiting threads if the queue size drop under queueByteSize + if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && + currentByteSize.get() < queueByteSize) + putLock.notify() + e + } + + /** + * Iterator for the queue + * @return Iterator for the queue + */ + override def iterator() = new Iterator[E] () { + private val iter = queue.iterator() + private var curr: E = null.asInstanceOf[E] + + def hasNext: Boolean = iter.hasNext + + def next: E = { + curr = iter.next() + curr + } + + def remove { + if (curr == null) + throw new IllegalArgumentException + iter.remove() + if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteSize) + putLock.notify() + } + } + + /** + * get the number of elements in the queue + * @return number of elements in the queue + */ + override def size() = queue.size() + + /** + * get the current byte size in the queue + * @return current queue size in bytes + */ + def byteSize() = currentByteSize.get() + + /** + * get the number of unused slots in the queue + * @return the number of unused slots in the queue + */ + def remainingSize = queue.remainingCapacity() + + /** + * get the remaining bytes capacity of the queue + * @return the remaining bytes capacity of the queue + */ + def remainingByteSize = math.max(0, queueByteSize - currentByteSize.get()) +} -- 1.8.3.4 (Apple Git-47) From 5abf366f6890446a689c61996ed31a3fd3f8735e Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 15 Oct 2014 09:25:55 -0700 Subject: [PATCH 2/3] changed arguments name --- .../kafka/utils/ByteBoundedBlockingQueue.scala | 30 +++++++++++----------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala index 9c61e68..f3fed3a 100644 --- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -25,9 +25,9 @@ import scala.xml.Null /** * A blocking queue that have size limits on both number of elements and number of bytes. */ -class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, sizeFunction: Option[(E) => Int]) +class ByteBoundedBlockingQueue[E] (val queueNumMessaegCapacity: Int, val queueByteCapacity: Int, sizeFunction: Option[(E) => Int]) extends Iterable[E] { - private val queue = new LinkedBlockingQueue[E] (queueSize) + private val queue = new LinkedBlockingQueue[E] (queueNumMessaegCapacity) private var currentByteSize = new AtomicInteger() private val putLock = new Object @@ -50,16 +50,16 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s var success = false try { putLock synchronized { - while (currentByteSize.get() >= queueByteSize && SystemTime.nanoseconds < expireTime) + while (currentByteSize.get() >= queueByteCapacity && SystemTime.nanoseconds < expireTime) putLock.wait(expireTime - SystemTime.nanoseconds) // only proceed if queue has capacity and not timeout - if (currentByteSize.get() < queueByteSize && SystemTime.nanoseconds < expireTime) { + if (currentByteSize.get() < queueByteCapacity && SystemTime.nanoseconds < expireTime) { success = queue.offer(e, expireTime - SystemTime.nanoseconds, TimeUnit.NANOSECONDS) // only increase queue byte size iff put succeeds if (success == true) currentByteSize.addAndGet(sizeFunction.get(e)) // wake up another thread in case multiple threads are waiting - if (currentByteSize.get() < queueByteSize) + if (currentByteSize.get() < queueByteCapacity) putLock.notify() } } @@ -80,13 +80,13 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s var success = false try { putLock synchronized { - while (currentByteSize.get() >= queueByteSize) + while (currentByteSize.get() >= queueByteCapacity) putLock.wait() success = queue.offer(e) if (success == true) currentByteSize.addAndGet(sizeFunction.get(e)) // wake up another thread in case multiple threads are waiting - if (currentByteSize.get() < queueByteSize) + if (currentByteSize.get() < queueByteCapacity) putLock.notify() } } catch { @@ -104,10 +104,10 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s */ def poll(timeout: Long, unit: TimeUnit): E = { var e = queue.poll(timeout, unit) - // only wake up waiting threads if the queue size drop under queueByteSize + // only wake up waiting threads if the queue size drop under queueByteCapacity if (e != null && - currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && - currentByteSize.get() < queueByteSize) + currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity && + currentByteSize.get() < queueByteCapacity) putLock.notify() e } @@ -118,9 +118,9 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s */ def poll(): E = { var e = queue.poll() - // only wake up waiting threads if the queue size drop under queueByteSize - if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && - currentByteSize.get() < queueByteSize) + // only wake up waiting threads if the queue size drop under queueByteCapacity + if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity && + currentByteSize.get() < queueByteCapacity) putLock.notify() e } @@ -144,7 +144,7 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s if (curr == null) throw new IllegalArgumentException iter.remove() - if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteSize) + if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteCapacity) putLock.notify() } } @@ -171,5 +171,5 @@ class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, s * get the remaining bytes capacity of the queue * @return the remaining bytes capacity of the queue */ - def remainingByteSize = math.max(0, queueByteSize - currentByteSize.get()) + def remainingByteSize = math.max(0, queueByteCapacity - currentByteSize.get()) } -- 1.8.3.4 (Apple Git-47) From 8fff3b9a6c4c8f564481e6231453eef84163e6fb Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 15 Oct 2014 09:27:29 -0700 Subject: [PATCH 3/3] correct typo. --- core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala index f3fed3a..39f34a7 100644 --- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -25,9 +25,9 @@ import scala.xml.Null /** * A blocking queue that have size limits on both number of elements and number of bytes. */ -class ByteBoundedBlockingQueue[E] (val queueNumMessaegCapacity: Int, val queueByteCapacity: Int, sizeFunction: Option[(E) => Int]) +class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueByteCapacity: Int, sizeFunction: Option[(E) => Int]) extends Iterable[E] { - private val queue = new LinkedBlockingQueue[E] (queueNumMessaegCapacity) + private val queue = new LinkedBlockingQueue[E] (queueNumMessageCapacity) private var currentByteSize = new AtomicInteger() private val putLock = new Object -- 1.8.3.4 (Apple Git-47)