Index: core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala =================================================================== --- core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala (revision 1204274) +++ core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala (working copy) @@ -18,13 +18,15 @@ package kafka.producer.async import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{TimeUnit, ScheduledThreadPoolExecutor, ScheduledExecutorService, BlockingQueue} +import java.util.concurrent.BlockingQueue +import org.apache.log4j.Logger +import kafka.utils.Utils -class AsyncProducerStats[T](queue: BlockingQueue[QueueItem[T]]) extends AsyncProducerStatsMBean { +class AsyncProducerStats extends AsyncProducerStatsMBean { val droppedEvents = new AtomicInteger(0) val numEvents = new AtomicInteger(0) - def getAsyncProducerQueueSize: Int = queue.size + def getAsyncProducerEvents: Int = numEvents.get def getAsyncProducerDroppedEvents: Int = droppedEvents.get @@ -32,3 +34,17 @@ def recordEvent = numEvents.getAndAdd(1) } + +class AsyncProducerQueueSizeStats[T](private val queue: BlockingQueue[QueueItem[T]]) extends AsyncProducerQueueSizeStatsMBean { + def getAsyncProducerQueueSize: Int = queue.size +} + +object AsyncProducerStats { + private val logger = Logger.getLogger(getClass()) + private val stats = new AsyncProducerStats + Utils.swallow(logger.warn, Utils.registerMBean(stats, AsyncProducer.ProducerMBeanName)) + + def recordDroppedEvents = stats.recordDroppedEvents + + def recordEvent = stats.recordEvent +} Index: core/src/main/scala/kafka/producer/async/AsyncProducer.scala =================================================================== --- core/src/main/scala/kafka/producer/async/AsyncProducer.scala (revision 1204274) +++ core/src/main/scala/kafka/producer/async/AsyncProducer.scala (working copy) @@ -23,8 +23,6 @@ import org.apache.log4j.{Level, Logger} import kafka.api.ProducerRequest import kafka.serializer.Encoder -import java.lang.management.ManagementFactory -import javax.management.ObjectName import java.util.{Random, Properties} import kafka.producer.{ProducerConfig, SyncProducer} @@ -32,6 +30,7 @@ val Shutdown = new Object val Random = new Random val ProducerMBeanName = "kafka.producer.Producer:type=AsyncProducerStats" + val ProducerQueueSizeMBeanName = "kafka.producer.Producer:type=AsyncProducerQueueSizeStats" } private[kafka] class AsyncProducer[T](config: AsyncProducerConfig, @@ -49,23 +48,15 @@ eventHandler.init(eventHandlerProps) if(cbkHandler != null) cbkHandler.init(cbkHandlerProps) - private val sendThread = new ProducerSendThread("ProducerSendThread-" + AsyncProducer.Random.nextInt, queue, + private val asyncProducerID = AsyncProducer.Random.nextInt + private val sendThread = new ProducerSendThread("ProducerSendThread-" + asyncProducerID, queue, serializer, producer, if(eventHandler != null) eventHandler else new DefaultEventHandler[T](new ProducerConfig(config.props), cbkHandler), cbkHandler, config.queueTime, config.batchSize, AsyncProducer.Shutdown) sendThread.setDaemon(false) + Utils.swallow(logger.warn, Utils.registerMBean( + new AsyncProducerQueueSizeStats[T](queue), AsyncProducer.ProducerQueueSizeMBeanName + "-" + asyncProducerID)) - val asyncProducerStats = new AsyncProducerStats[T](queue) - val mbs = ManagementFactory.getPlatformMBeanServer - try { - val objName = new ObjectName(AsyncProducer.ProducerMBeanName) - if(mbs.isRegistered(objName)) - mbs.unregisterMBean(objName) - mbs.registerMBean(asyncProducerStats, objName) - }catch { - case e: Exception => logger.warn("can't register AsyncProducerStats") - } - def this(config: AsyncProducerConfig) { this(config, new SyncProducer(config), @@ -81,7 +72,7 @@ def send(topic: String, event: T) { send(topic, event, ProducerRequest.RandomPartition) } def send(topic: String, event: T, partition:Int) { - asyncProducerStats.recordEvent + AsyncProducerStats.recordEvent if(closed.get) throw new QueueClosedException("Attempt to add event to a closed queue.") @@ -116,7 +107,7 @@ cbkHandler.afterEnqueue(data, added) if(!added) { - asyncProducerStats.recordDroppedEvents + AsyncProducerStats.recordDroppedEvents logger.error("Event queue is full of unsent messages, could not send event: " + event.toString) throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + event.toString) }else { Index: core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala =================================================================== --- core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala (revision 1204274) +++ core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala (working copy) @@ -18,6 +18,10 @@ package kafka.producer.async trait AsyncProducerStatsMBean { - def getAsyncProducerQueueSize: Int + def getAsyncProducerEvents: Int def getAsyncProducerDroppedEvents: Int } + +trait AsyncProducerQueueSizeStatsMBean { + def getAsyncProducerQueueSize: Int +}