Index: core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala =================================================================== --- core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala (revision 0) +++ core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala (revision 0) @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.producer.async + +/** + * Indicates that the given config parameter has invalid value + */ +class IllegalQueueStateException(message: String) extends RuntimeException(message) { + def this() = this(null) +} Index: core/src/main/scala/kafka/producer/async/AsyncProducer.scala =================================================================== --- core/src/main/scala/kafka/producer/async/AsyncProducer.scala (revision 1160539) +++ core/src/main/scala/kafka/producer/async/AsyncProducer.scala (working copy) @@ -90,27 +90,27 @@ if(cbkHandler != null) data = cbkHandler.beforeEnqueue(data) - val added = if (config.enqueueTimeoutMs != 0) { - try { - if (config.enqueueTimeoutMs < 0) { - queue.put(data) - true + val added = config.enqueueTimeoutMs match { + case 0 => + queue.offer(data) + case _ => + try { + config.enqueueTimeoutMs < 0 match { + case true => + queue.put(data) + true + case _ => + queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS) + } } - else { - queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS) + catch { + case e: InterruptedException => + val msg = "%s interrupted during enqueue of event %s.".format( + getClass.getSimpleName, event.toString) + logger.error(msg) + throw new AsyncProducerInterruptedException(msg) } - } - catch { - case e: InterruptedException => - val msg = "%s interrupted during enqueue of event %s.".format( - getClass.getSimpleName, event.toString) - logger.error(msg) - throw new AsyncProducerInterruptedException(msg) - } } - else { - queue.offer(data) - } if(cbkHandler != null) cbkHandler.afterEnqueue(data, added) @@ -121,7 +121,7 @@ throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + event.toString) }else { if(logger.isTraceEnabled) { - logger.trace("Added event to send queue for topic: " + topic + ":" + event.toString) + logger.trace("Added event to send queue for topic: " + topic + ", partition: " + partition + ":" + event.toString) logger.trace("Remaining queue size: " + queue.remainingCapacity) } } @@ -132,11 +132,13 @@ cbkHandler.close logger.info("Closed the callback handler") } + closed.set(true) queue.put(new QueueItem(AsyncProducer.Shutdown.asInstanceOf[T], null, -1)) + if(logger.isDebugEnabled) + logger.debug("Added shutdown command to the queue") sendThread.shutdown sendThread.awaitShutdown producer.close - closed.set(true) logger.info("Closed AsyncProducer") } Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1160539) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -13,7 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package kafka.producer.async @@ -39,6 +39,10 @@ if(cbkHandler != null) processedEvents = cbkHandler.beforeSendingData(events) + if(logger.isTraceEnabled) + processedEvents.foreach(event => logger.trace("Handling event for Topic: %s, Partition: %d" + .format(event.getTopic, event.getPartition))) + send(serialize(collate(processedEvents), serializer), syncProducer) } @@ -54,7 +58,6 @@ private def serialize(eventsPerTopic: Map[(String,Int), Seq[T]], serializer: Encoder[T]): Map[(String, Int), ByteBufferMessageSet] = { val eventsPerTopicMap = eventsPerTopic.map(e => ((e._1._1, e._1._2) , e._2.map(l => serializer.toMessage(l)))) - val topicsAndPartitions = eventsPerTopic.map(e => e._1) /** enforce the compressed.topics config here. * If the compression codec is anything other than NoCompressionCodec, * Enable compression only for specified topics if any @@ -66,25 +69,29 @@ ((topicAndEvents._1._1, topicAndEvents._1._2), config.compressionCodec match { case NoCompressionCodec => - if(logger.isDebugEnabled) - logger.debug("Sending %d messages with no compression".format(topicAndEvents._2.size)) + if(logger.isTraceEnabled) + logger.trace("Sending %d messages with no compression to topic %s on partition %d" + .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2)) new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*) case _ => config.compressedTopics.size match { case 0 => - if(logger.isDebugEnabled) - logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size, config.compressionCodec.codec)) + if(logger.isTraceEnabled) + logger.trace("Sending %d messages with compression codec %d to topic %s on partition %d" + .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1, topicAndEvents._1._2)) new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*) case _ => if(config.compressedTopics.contains(topicAndEvents._1._1)) { - if(logger.isDebugEnabled) - logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size, config.compressionCodec.codec)) + if(logger.isTraceEnabled) + logger.trace("Sending %d messages with compression %d to topic %s on partition %d" + .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2, config.compressionCodec.codec)) new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*) } else { - if(logger.isDebugEnabled) - logger.debug("Sending %d messages with no compression as %s is not in compressed.topics - %s" - .format(topicAndEvents._2.size, topicAndEvents._1._1, config.compressedTopics.toString)) + if(logger.isTraceEnabled) + logger.trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s" + .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2, topicAndEvents._1._1, + config.compressedTopics.toString)) new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*) } } @@ -104,8 +111,8 @@ remainingEvents = topicEvents._2 distinctPartitions.foreach { p => val topicPartitionEvents = (topicEvents._1 partition (e => (e.getPartition == p)))._1 - if(topicPartitionEvents.size > 0) - collatedEvents += ( (topic, p) -> topicPartitionEvents.map(q => q.getData)) + if(topicPartitionEvents.size > 0) + collatedEvents += ((topic, p) -> topicPartitionEvents.map(q => q.getData)) } } collatedEvents Index: core/src/main/scala/kafka/producer/async/ProducerSendThread.scala =================================================================== --- core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (revision 1160539) +++ core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (working copy) @@ -77,6 +77,9 @@ // returns a null object val expired = currentQueueItem == null if(currentQueueItem != null) { + if(logger.isTraceEnabled) + logger.trace("Dequeued item for topic %s and partition %d" + .format(currentQueueItem.getTopic, currentQueueItem.getPartition)) // handle the dequeued current item if(cbkHandler != null) events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem) @@ -97,6 +100,9 @@ events = new ListBuffer[QueueItem[T]] } } + if(queue.size > 0) + throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue" + .format(queue.size)) if(cbkHandler != null) { logger.info("Invoking the callback handler before handling the last batch of %d events".format(events.size)) val addedEvents = cbkHandler.lastBatchBeforeClose