Index: core/src/main/scala/kafka/producer/SyncProducer.scala =================================================================== --- core/src/main/scala/kafka/producer/SyncProducer.scala (revision 1245727) +++ core/src/main/scala/kafka/producer/SyncProducer.scala (working copy) @@ -26,9 +26,11 @@ import scala.math._ import kafka.common.MessageSizeTooLargeException import java.nio.ByteBuffer +import java.util.Random object SyncProducer { val RequestKey: Short = 0 + var randomGenerator = new Random() } /* @@ -40,8 +42,10 @@ private val MaxConnectBackoffMs = 60000 private var channel : SocketChannel = null private var sentOnConnection = 0 - private var lastConnectionTime = System.currentTimeMillis + /** Set to a random value between (now - reconnectTImeInterval) and now **/ + private var lastConnectionTime = System.currentTimeMillis - SyncProducer.randomGenerator.nextDouble() * config.reconnectInterval + private val lock = new Object() @volatile private var shutdown: Boolean = false @@ -141,9 +145,12 @@ } private def verifyMessageSize(messages: ByteBufferMessageSet) { - for (messageAndOffset <- messages) + var shallowIter = messages.shallowIterator + while(shallowIter.hasNext){ + var messageAndOffset = shallowIter.next if (messageAndOffset.message.payloadSize > config.maxMessageSize) throw new MessageSizeTooLargeException + } } /** Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1245727) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -38,8 +38,8 @@ processedEvents = cbkHandler.beforeSendingData(events) if(logger.isTraceEnabled) - processedEvents.foreach(event => trace("Handling event for Topic: %s, Partition: %d" - .format(event.getTopic, event.getPartition))) + processedEvents.foreach(events => trace("Handling event for Topic: %s, Partition: %d" + .format(events.getTopic, events.getPartition))) send(serialize(collate(processedEvents), serializer), syncProducer) } Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (revision 1245727) +++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (working copy) @@ -77,6 +77,34 @@ override def iterator: Iterator[MessageAndOffset] = deepIterator + /** Shallow iterator is used in SyncProducer.verifyMessageSize() function, to ensure each level messages has size smaller than the maxMessageSize threshold **/ + def shallowIterator(): Iterator[MessageAndOffset] = { + ErrorMapping.maybeThrowException(errorCode) + new IteratorTemplate[MessageAndOffset] { + var topIter = buffer.slice() + var currValidBytes = initialOffset + + override def makeNext: MessageAndOffset = { + if (topIter.remaining < 4) { + return allDone() + } + val size = topIter.getInt() + + trace("Remaining bytes in iterator = " + topIter.remaining) + trace("size of data = " + size) + + val message = topIter.slice() + message.limit(size) + topIter.position(topIter.position + size) + val newMessage = new Message(message) + currValidBytes += 4 + size + trace("currValidBytes = " + currValidBytes) + new MessageAndOffset(newMessage, currValidBytes) + } + } + } + + private def deepIterator(): Iterator[MessageAndOffset] = { ErrorMapping.maybeThrowException(errorCode) new IteratorTemplate[MessageAndOffset] {