From 91330f8e1de4f1abb6b21a6ff054b51e15462fe0 Mon Sep 17 00:00:00 2001 From: becketqin Date: Tue, 23 Sep 2014 18:09:14 -0700 Subject: [PATCH] mirror maker redesign; adding byte bounded blocking queue. --- .../consumer/ZookeeperConsumerConnector.scala | 16 +- core/src/main/scala/kafka/tools/MirrorMaker.scala | 292 +++++++++++++++++---- .../kafka/utils/ByteBoundedBlockingQueue.scala | 158 +++++++++++ 3 files changed, 413 insertions(+), 53 deletions(-) create mode 100644 core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index fbc680f..6e1de37 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -285,17 +285,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - def commitOffsets(isAutoCommit: Boolean = true) { + def commitOffsets(isAutoCommit: Boolean = true) = { + commitOffsets(isAutoCommit, null) + } + + def commitOffsets(isAutoCommit: Boolean, + topicPartitionOffsets: immutable.Map[TopicAndPartition, OffsetAndMetadata]) { var retriesRemaining = 1 + (if (isAutoCommit) config.offsetsCommitMaxRetries else 0) // no retries for commits from auto-commit var done = false while (!done) { val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors - val offsetsToCommit = immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => + val offsetsToCommit = if (topicPartitionOffsets == null) {immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => partitionTopicInfos.map { case (partition, info) => TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset()) } - }.toSeq:_*) + }.toSeq:_*)} else topicPartitionOffsets if (offsetsToCommit.size > 0) { if (config.offsetsStorage == "zookeeper") { @@ -702,7 +707,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, case Some(f) => f.stopConnections clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams) - info("Committing all offsets after clearing the fetcher queues") /** * here, we need to commit offsets before stopping the consumer from returning any more messages * from the current data chunk. Since partition ownership is not yet released, this commit offsets @@ -711,8 +715,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes * successfully and the fetchers restart to fetch more data chunks **/ - if (config.autoCommitEnable) + if (config.autoCommitEnable) { + info("Committing all offsets after clearing the fetcher queues") commitOffsets() + } case None => } } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index b8698ee..0d651e1 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,28 +17,39 @@ package kafka.tools -import kafka.utils.{SystemTime, Utils, CommandLineUtils, Logging} +import java.lang.Thread.UncaughtExceptionHandler + +import com.yammer.metrics.core.Gauge +import kafka.common.{TopicAndPartition, OffsetAndMetadata} +import kafka.utils._ import kafka.consumer._ import kafka.serializer._ -import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer} +import kafka.producer.{OldProducer, NewShinyProducer} import kafka.metrics.KafkaMetricsGroup +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback -import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.{RecordMetadata, ProducerRecord} import scala.collection.JavaConversions._ import joptsimple.OptionParser -import java.util.Random -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch} +import java.util.{Properties} +import java.util.concurrent.atomic.{AtomicBoolean} +import java.util.concurrent._ object MirrorMaker extends Logging { private var connectors: Seq[ZookeeperConsumerConnector] = null private var consumerThreads: Seq[ConsumerThread] = null private var producerThreads: Seq[ProducerThread] = null + private var offsetCommitThread: OffsetCommitThread = null + private var isCleanShutdown: AtomicBoolean = new AtomicBoolean(false) + + private val valueFactory = (k: TopicAndPartition) => new Pool[Int, Long] + private val topicPartitionOffsetMap: Pool[TopicAndPartition, Pool[Int, Long]] = + new Pool[TopicAndPartition, Pool[Int,Long]](Some(valueFactory)) - private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes) + private val shutdownMessage : MirrorMakerRecord = new MirrorMakerRecord("shutdown", 0, 0, null, "shutdown".getBytes) def main(args: Array[String]) { @@ -82,6 +93,13 @@ object MirrorMaker extends Logging { .ofType(classOf[java.lang.Integer]) .defaultsTo(10000) + val bufferByteSizeOpt = parser.accepts("queue.byte.size", + "Number of bytes that are buffered between the consumer and producer") + .withRequiredArg() + .describedAs("Queue size in terms of number of messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100000000) + val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to mirror.") .withRequiredArg() @@ -94,6 +112,12 @@ object MirrorMaker extends Logging { .describedAs("Java regex (String)") .ofType(classOf[String]) + val offsetCommitIntervalMsOpt = parser.accepts("offsetCommitIntervalMs", + "interval to commit offsets") + .withRequiredArg() + .describedAs("offset commit interval in millisecond") + .ofType(classOf[java.lang.Integer]) + val helpOpt = parser.accepts("help", "Print this message.") if(args.length == 0) @@ -115,6 +139,8 @@ object MirrorMaker extends Logging { val numProducers = options.valueOf(numProducersOpt).intValue() val numStreams = options.valueOf(numStreamsOpt).intValue() val bufferSize = options.valueOf(bufferSizeOpt).intValue() + val bufferByteSize = options.valueOf(bufferByteSizeOpt).intValue() + val offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() // create consumer streams connectors = options.valuesOf(consumerConfigOpt).toList @@ -123,7 +149,7 @@ object MirrorMaker extends Logging { val numConsumers = connectors.size * numStreams // create a data channel btw the consumers and the producers - val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers) + val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numConsumers, numProducers) // create producer threads val useNewProducer = options.has(useNewProducerOpt) @@ -133,9 +159,9 @@ object MirrorMaker extends Logging { producerProps.setProperty("client.id", clientId + "-" + i) val producer = if (useNewProducer) - new NewShinyProducer(producerProps) + new MirrorMakerNewProducer(producerProps) else - new OldProducer(producerProps) + new MirrorMakerOldProducer(producerProps) new ProducerThread(mirrorDataChannel, producer, i) }) @@ -164,6 +190,11 @@ object MirrorMaker extends Logging { consumerThreads.foreach(_.start) producerThreads.foreach(_.start) + // create offset commit thread + if (useNewProducer) { + offsetCommitThread = new OffsetCommitThread(offsetCommitIntervalMs) + offsetCommitThread.start() + } // we wait on producer's shutdown latch instead of consumers // since the consumer threads can hit a timeout/other exception; @@ -173,22 +204,31 @@ object MirrorMaker extends Logging { } def cleanShutdown() { - if (connectors != null) connectors.foreach(_.shutdown) - if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) - if (producerThreads != null) { - producerThreads.foreach(_.shutdown) - producerThreads.foreach(_.awaitShutdown) + if (isCleanShutdown.compareAndSet(false, true)) { + info("Start clean shutdown.") + info("Shutting down producer threads.") + if (producerThreads != null) { + producerThreads.foreach(_.shutdown) + producerThreads.foreach(_.awaitShutdown) + } + info("Shutting down offset commit thread.") + if (offsetCommitThread != null) { + offsetCommitThread.shutdown + offsetCommitThread.awaitShutdown + } + info("Shutting down consumer threads.") + if (connectors != null) connectors.foreach(_.shutdown) + if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) + info("Kafka mirror maker shutdown successfully") } - info("Kafka mirror maker shutdown successfully") } - class DataChannel(capacity: Int, numProducers: Int, numConsumers: Int) extends KafkaMetricsGroup { + class DataChannel(capacity: Int, byteCapacity: Int, numConsumers: Int, numProducers: Int) extends KafkaMetricsGroup { - val queues = new Array[BlockingQueue[ProducerRecord]](numConsumers) - for (i <- 0 until numConsumers) - queues(i) = new ArrayBlockingQueue[ProducerRecord](capacity) - - private val counter = new AtomicInteger(new Random().nextInt()) + val queues = new Array[ByteBoundedBlockingQueue[MirrorMakerRecord]](numProducers) + val sizeFunction = (record: MirrorMakerRecord) => record.size + for (i <- 0 until numProducers) + queues(i) = new ByteBoundedBlockingQueue[MirrorMakerRecord](capacity, byteCapacity, Some(sizeFunction)) // We use a single meter for aggregated wait percentage for the data channel. // Since meter is calculated as total_recorded_value / time_window and @@ -197,16 +237,17 @@ object MirrorMaker extends Logging { private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS) private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS) private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size") - - def put(record: ProducerRecord) { - // If the key of the message is empty, use round-robin to select the queue - // Otherwise use the queue based on the key value so that same key-ed messages go to the same queue - val queueId = - if(record.key() != null) { - Utils.abs(java.util.Arrays.hashCode(record.key())) % numConsumers - } else { - Utils.abs(counter.getAndIncrement()) % numConsumers - } + private val channelByteSizeHist = newHistogram("MirrorMaker-DataChannel-Byte-Size") + + def put(record: MirrorMakerRecord, qid: Int = -1) { + // If queue id is specified (only when producer thread shuttdown is called), use qid + // otherwise use hash of topic+partition to decide which queue to put the record in. + val queueId = { + if (qid == -1) + Utils.abs(java.util.Arrays.hashCode((record.sourceTopic + record.sourcePartition).toCharArray)) % numProducers + else + qid + } val queue = queues(queueId) var putSucceed = false @@ -215,18 +256,20 @@ object MirrorMaker extends Logging { putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS) waitPut.mark((SystemTime.nanoseconds - startPutTime) / numProducers) } - channelSizeHist.update(queue.size) + channelSizeHist.update(queues.map(q => q.size()).sum) + channelByteSizeHist.update(queues.map(q => q.byteSize()).sum) } - def take(queueId: Int): ProducerRecord = { + def take(queueId: Int): MirrorMakerRecord = { val queue = queues(queueId) - var data: ProducerRecord = null + var data: MirrorMakerRecord = null while (data == null) { val startTakeTime = SystemTime.nanoseconds data = queue.poll(500, TimeUnit.MILLISECONDS) waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numConsumers) } - channelSizeHist.update(queue.size) + channelSizeHist.update(queues.map(q => q.size()).sum) + channelByteSizeHist.update(queues.map(q => q.byteSize()).sum) data } } @@ -246,15 +289,21 @@ object MirrorMaker extends Logging { info("Starting mirror maker consumer thread " + threadName) try { for (msgAndMetadata <- stream) { - val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message) + val data = new MirrorMakerRecord(msgAndMetadata.topic, + msgAndMetadata.partition, + msgAndMetadata.offset, + msgAndMetadata.key, + msgAndMetadata.message) mirrorDataChannel.put(data) } } catch { - case e: Throwable => - fatal("Stream unexpectedly exited.", e) + case t: Throwable => + fatal("Consumer thread %s unexpectedly exited.".format(this.getName), t) } finally { shutdownLatch.countDown() info("Consumer thread stopped") + if (isCleanShutdown.compareAndSet(false, true)) + System.exit(-1) } } @@ -269,7 +318,7 @@ object MirrorMaker extends Logging { } class ProducerThread (val dataChannel: DataChannel, - val producer: BaseProducer, + val producer: MirrorMakerBaseProducer, val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { private val threadName = "mirrormaker-producer-" + threadId private val shutdownComplete: CountDownLatch = new CountDownLatch(1) @@ -277,32 +326,37 @@ object MirrorMaker extends Logging { setName(threadName) - override def run { + override def run() { info("Starting mirror maker producer thread " + threadName) try { while (true) { - val data: ProducerRecord = dataChannel.take(threadId) - trace("Sending message with value size %d".format(data.value().size)) + val data: MirrorMakerRecord = dataChannel.take(threadId) + trace("Sending message with value size %d".format(data.value.size)) if(data eq shutdownMessage) { info("Received shutdown message") return } - producer.send(data.topic(), data.key(), data.value()) + producer.send(new TopicAndPartition(data.sourceTopic, data.sourcePartition), + data.sourceOffset, + data.key, + data.value) } } catch { - case t: Throwable => { - fatal("Producer thread failure due to ", t) - } + case t: Throwable => + fatal("Producer thread %s failure due to ".format(this.getName), t) } finally { shutdownComplete.countDown info("Producer thread stopped") + // if it exits accidentally, stop the entire mirror maker + if (isCleanShutdown.compareAndSet(false, true)) + System.exit(-1) } } def shutdown { try { info("Producer thread " + threadName + " shutting down") - dataChannel.put(shutdownMessage) + dataChannel.put(shutdownMessage, threadId) } catch { case ie: InterruptedException => { @@ -323,5 +377,147 @@ object MirrorMaker extends Logging { } } } + + class OffsetCommitThread(commitIntervalMs: Int) extends Thread with Logging with KafkaMetricsGroup { + private val threadName = "mirrormaker-offset-commit-thread" + private val shutdownComplete: CountDownLatch = new CountDownLatch(1) + this.logIdent = "[%s]".format(threadName) + var shutdownFlag: Boolean = false + var commitCounter: Int = 0 + + this.setName(threadName) + + newGauge("MirrorMaker-Offset-Commit-Counter", + new Gauge[Int] { + def value = commitCounter + }) + + /** + * The offset commit thread is using the first ZookeeperConsumerConnector in the connectors. It is based on the + * assumption that all the consumer instances in mirror maker are in the same consumer group. The impact of this + * approach is that: + * 1. If offsets are commited to ZooKeeper, the checkpointedZkOffsets in the first ZookeeperConsumerConnector + * could be tainted by partitions that it does not own. But this problem exists now anyway since we are not + * cleaning checkpointedZKOffsets on consumer rebalance. + * 2. The offset commit could potentially slow down the first ZookeeperConsumerConnector if the offset commit + * is too big (which is not likely to happen). + * Ideally, committing offsets of a partition should be done by the ZookeeperConsumerConnector instance who is + * consuming from the partition. But implementation could be tricky when consumer rebalance occurs. It needs either + * the offset map in mirror maker to be updated on consumer rebalance or putting separate offset maps in each + * ZookeeperConsumerConnector. It seems not worth doing that. + */ + override def run() { + info("Starting mirror maker offset commit thread") + try { + while (shutdownFlag == false) { + Thread.sleep(commitIntervalMs) + commitOffset + } + } catch { + case t: Throwable => fatal("offset commit thread exits due to", t) + } finally { + swallow(commitOffset) + shutdownComplete.countDown() + info("offset commit thread exited") + if (isCleanShutdown.compareAndSet(false, true)) + System.exit(-1) + } + } + + private def commitOffset { + val offsetsToCommit = collection.immutable.Map(topicPartitionOffsetMap.map { + case (topicPartition, partitionOffsetMap) => + topicPartition -> OffsetAndMetadata(getOffsetToCommit(partitionOffsetMap), null) + }.toSeq: _*) + trace("committing offset: %s".format(offsetsToCommit)) + val connector = connectors.headOption match { + case None => + warn("No consumer available to commit offset.") + case Some(connector) => + connector.commitOffsets(false, offsetsToCommit) + commitCounter += 1 + } + } + + private def getOffsetToCommit(offsetsMap: Pool[Int, Long]): Long = { + val offsets = offsetsMap.map(_._2).toSeq.sorted + val iter = offsets.iterator + var offsetToCommit = iter.next() + while (iter.hasNext && offsetToCommit + 1 == iter.next()) + offsetToCommit += 1 + offsetToCommit + 1 + } + + def shutdown { + shutdownFlag = true + } + + def awaitShutdown { + try { + shutdownComplete.await + info("Offset commit thread shutdown complete") + } catch { + case ie: InterruptedException => { + warn("Shutdown of the offset commit thread interrupted") + } + } + } + } + + private[kafka] trait MirrorMakerBaseProducer { + def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) + def close() + } + + private class MirrorMakerNewProducer (val producerProps: Properties) + extends NewShinyProducer(producerProps) with MirrorMakerBaseProducer { + + override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) { + val record = new ProducerRecord(topicPartition.topic, key, value) + if(sync) { + topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(this.producer.send(record).get().partition(), offset) + } else { + this.producer.send(record, + new MirrorMakerProducerCallback(topicPartition, offset, key, value)) + } + } + } + + private class MirrorMakerOldProducer (val producerProps: Properties) + extends OldProducer(producerProps) with MirrorMakerBaseProducer { + + override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) { + super.send(topicPartition.topic, key, value) + } + + override def close() { + super.close() + } + } + + private class MirrorMakerProducerCallback (val topicPartition: TopicAndPartition, + val offset: Long, + val key: Array[Byte], + val value: Array[Byte]) + extends ErrorLoggingCallback(topicPartition.topic, key, value, false) { + + override def onCompletion(metadata: RecordMetadata, exception: Exception) { + if (exception != null) { + super.onCompletion(metadata, exception) + } else { + info("updating offset:[%s] -> %d".format(topicPartition, offset)) + topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(metadata.partition(), offset) + } + } + } + + private[kafka] class MirrorMakerRecord (val sourceTopic: String, + val sourcePartition: Int, + val sourceOffset: Long, + val key: Array[Byte], + val value: Array[Byte]) { + def size = value.length + {if (key == null) 0 else key.length} + } + } diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala new file mode 100644 index 0000000..2d2a00e --- /dev/null +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -0,0 +1,158 @@ +package kafka.utils + +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} + +import scala.xml.Null + +/** + * A blocking queue that have size limits on both number of elements and number of bytes. + */ +class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, sizeFunction: Option[(E) => Int]) + extends Iterable[E] { + private val queue = new LinkedBlockingQueue[E] (queueSize) + private var currentByteSize = new AtomicInteger() + private val putLock = new Object + + /** + * Put an element to the tail of the queue. Wait for certain amount of time if queue is full. + * The size check is only checking if the bytes in the queue reaches threshold or not. That means + * as long as current queue size is under the threshold, the element could be put into the queue, even + * after that the queue byte size will exceed the threshold. Strict queue size check is easy but could potentially + * lead to poor efficiency when the queue is almost full. + * @param e the element to put into the queue + * @param timeout the amount of time to wait before the expire the operation + * @param unit the time unit of timeout parameter, default to millisecond + * @return true if the element is put into queue, false if it is not + * @throws NullPointerException if element is null + */ + def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean = { + if (e == null) throw new NullPointerException("Putting null element into queue.") + val startTime = SystemTime.nanoseconds + val expireTime = startTime + unit.toNanos(timeout) + var success = false + try { + putLock synchronized { + while (currentByteSize.get() >= queueByteSize && SystemTime.nanoseconds < expireTime) + putLock.wait(expireTime - SystemTime.nanoseconds) + // only proceed if queue has capacity and not timeout + if (currentByteSize.get() < queueByteSize && SystemTime.nanoseconds < expireTime) { + success = queue.offer(e, expireTime - SystemTime.nanoseconds, TimeUnit.NANOSECONDS) + // only increase queue byte size iff put succeeds + if (success == true) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteSize) + putLock.notify() + } + } + } catch { + case ie: InterruptedException => + } + success + } + + /** + * Put an element to the tail of the queue, block if queue is full + * @param e The element to put into queue + * @return true on succeed, false on failure + * @throws NullPointerException if element is null + */ + def offer(e: E): Boolean = { + if (e == null) throw new NullPointerException("Putting null element into queue.") + var success = false + try { + putLock synchronized { + while (currentByteSize.get() >= queueByteSize) + putLock.wait() + success = queue.offer(e) + if (success == true) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteSize) + putLock.notify() + } + } catch { + case ie: InterruptedException => + } + success + } + + /** + * Get an element from the head of queue. Wait for some time if the queue is empty. + * The poll method almost does not block on offer. + * @param timeout the amount of time to wait if the queue is empty + * @param unit the unit type + * @return the first element in the queue, null if queue is empty + */ + def poll(timeout: Long, unit: TimeUnit): E = { + var e = queue.poll(timeout, unit) + // only wake up waiting threads if the queue size drop under queueByteSize + if (e != null && + currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && + currentByteSize.get() < queueByteSize) + putLock.notify() + e + } + + /** + * Get an element from the head of the queue, block if the queue is empty + * @return the first element in the queue, null if queue is empty + */ + def poll(): E = { + var e = queue.poll() + // only wake up waiting threads if the queue size drop under queueByteSize + if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && + currentByteSize.get() < queueByteSize) + putLock.notify() + e + } + + /** + * Iterator for the queue + * @return Iterator for the queue + */ + override def iterator() = new Iterator[E] () { + private val iter = queue.iterator() + private var curr: E = null.asInstanceOf[E] + + def hasNext: Boolean = iter.hasNext + + def next: E = { + curr = iter.next() + curr + } + + def remove { + if (curr == null) + throw new IllegalArgumentException + iter.remove() + if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteSize) + putLock.notify() + } + } + + /** + * get the number of elements in the queue + * @return number of elements in the queue + */ + override def size() = queue.size() + + /** + * get the current byte size in the queue + * @return current queue size in bytes + */ + def byteSize() = currentByteSize.get() + + /** + * get the number of unused slots in the queue + * @return the number of unused slots in the queue + */ + def remainingSize = queue.remainingCapacity() + + /** + * get the remaining bytes capacity of the queue + * @return the remaining bytes capacity of the queue + */ + def remainingByteSize = math.max(0, queueByteSize - currentByteSize.get()) +} -- 1.8.3.4 (Apple Git-47)