From b6077dd727bbaf80a85e45f2e216feb0ae4fb88e Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Fri, 22 Feb 2013 15:12:41 -0800 Subject: [PATCH 1/4] changes for mem fix --- core/src/main/scala/kafka/api/ProducerRequest.scala | 13 +++++++++---- core/src/main/scala/kafka/network/RequestChannel.scala | 5 +++-- .../kafka/producer/async/DefaultEventHandler.scala | 8 ++++---- core/src/main/scala/kafka/server/KafkaApis.scala | 18 ++++++++++-------- .../kafka/api/RequestResponseSerializationTest.scala | 2 +- .../scala/unit/kafka/network/SocketServerTest.scala | 8 +++++--- .../scala/unit/kafka/producer/SyncProducerTest.scala | 3 ++- 7 files changed, 34 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 72b2cba..8842ce9 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -49,7 +49,7 @@ object ProducerRequest { }) }) - ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, Map(partitionDataPairs:_*)) + ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, collection.mutable.Map(partitionDataPairs:_*)) } } @@ -58,19 +58,20 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, - data: Map[TopicAndPartition, ByteBufferMessageSet]) + data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { /** * Partitions the data into a map of maps (one for each topic). */ private lazy val dataGroupedByTopic = data.groupBy(_._1.topic) + val topicPartitionMessageSizeMap = data.map(r => r._1 -> r._2.sizeInBytes).toMap def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, - data: Map[TopicAndPartition, ByteBufferMessageSet]) = + data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data) def writeTo(buffer: ByteBuffer) { @@ -130,7 +131,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, producerRequest.append("; ClientId: " + clientId) producerRequest.append("; RequiredAcks: " + requiredAcks) producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms") - producerRequest.append("; TopicAndPartition: " + data.map(r => r._1 -> r._2.sizeInBytes).toMap.mkString(",")) + producerRequest.append("; TopicAndPartition: " + topicPartitionMessageSizeMap.map(r => r._1 -> r._2).toMap.mkString(",")) producerRequest.toString() } @@ -142,5 +143,9 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, val errorResponse = ProducerResponse(correlationId, producerResponseStatus) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } + + def emptyData(){ + data.clear() + } } diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 931092d..b21d9df 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -32,7 +32,7 @@ object RequestChannel extends Logging { val AllDone = new Request(1, 2, getShutdownReceive(), 0) def getShutdownReceive() = { - val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition, ByteBufferMessageSet]()) + val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2) byteBuffer.putShort(RequestKeys.ProduceKey) emptyProducerRequest.writeTo(byteBuffer) @@ -40,13 +40,14 @@ object RequestChannel extends Logging { byteBuffer } - case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) { + case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) { @volatile var dequeueTimeMs = -1L @volatile var apiLocalCompleteTimeMs = -1L @volatile var responseCompleteTimeMs = -1L val requestId = buffer.getShort() val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) buffer.rewind() + buffer = null trace("Received request : %s".format(requestObj)) def updateRequestMetrics() { diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 8954a03..cc45700 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -142,8 +142,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig, serializedMessages } - def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = { - val ret = new HashMap[Int, Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]] + def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = { + val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]] try { for (message <- messages) { val topicPartitionsList = getPartitionListForTopic(message) @@ -227,7 +227,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, * @param messagesPerTopic the messages as a map from (topic, partition) -> messages * @return the set (topic, partitions) messages which incurred an error sending or processing */ - private def send(brokerId: Int, messagesPerTopic: Map[TopicAndPartition, ByteBufferMessageSet]) = { + private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = { if(brokerId < 0) { warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(","))) messagesPerTopic.keys.toSeq @@ -270,7 +270,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, } } - private def groupMessagesToSet(messagesPerTopicAndPartition: Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = { + private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = { /** enforce the compressed.topics config here. * If the compression codec is anything other than NoCompressionCodec, * Enable compression only for specified topics if any diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5351e7c..cfabfc1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -99,8 +99,8 @@ class KafkaApis(val requestChannel: RequestChannel, * Check if a partitionData from a produce request can unblock any * DelayedFetch requests. */ - def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messages: MessageSet) { - val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), messages) + def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messageSizeInBytes: Int) { + val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), messageSizeInBytes) trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size)) // send any newly unblocked responses @@ -122,7 +122,7 @@ class KafkaApis(val requestChannel: RequestChannel, val numPartitionsInError = localProduceResults.count(_.error.isDefined) produceRequest.data.foreach(partitionAndData => - maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2)) + maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2.sizeInBytes)) val allPartitionHaveReplicationFactorOne = !produceRequest.data.keySet.exists( @@ -162,6 +162,8 @@ class KafkaApis(val requestChannel: RequestChannel, debug(satisfiedProduceRequests.size + " producer requests unblocked during produce to local log.") satisfiedProduceRequests.foreach(_.respond()) + // we do not need the data anymore + produceRequest.emptyData() } } @@ -438,14 +440,14 @@ class KafkaApis(val requestChannel: RequestChannel, * A holding pen for fetch requests waiting to be satisfied */ class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int) - extends RequestPurgatory[DelayedFetch, MessageSet](brokerId, purgeInterval) { + extends RequestPurgatory[DelayedFetch, Int](brokerId, purgeInterval) { this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId) /** * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field */ - def checkSatisfied(messages: MessageSet, delayedFetch: DelayedFetch): Boolean = { - val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messages.sizeInBytes) + def checkSatisfied(messageSizeInBytes: Int, delayedFetch: DelayedFetch): Boolean = { + val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes) accumulatedSize >= delayedFetch.fetch.minBytes } @@ -546,8 +548,8 @@ class KafkaApis(val requestChannel: RequestChannel, fetchPartitionStatus.error = ErrorMapping.NoError } if (!fetchPartitionStatus.acksPending) { - val messages = produce.data(followerFetchRequestKey.topicAndPartition) - maybeUnblockDelayedFetchRequests(topic, partitionId, messages) + val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition) + maybeUnblockDelayedFetchRequests(topic, partitionId, messageSizeInBytes) } } diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 26f31ec..d0c7b90 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -61,7 +61,7 @@ object SerializationTestUtils{ case(partitionDataMessage, partition) => (TopicAndPartition(topic, partition), partitionDataMessage) }) - collection.immutable.Map(groupedData:_*) + collection.mutable.Map(groupedData:_*) } private val requestInfos = collection.immutable.Map( diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 9322b2c..4c9cd7e 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -60,8 +60,10 @@ class SocketServerTest extends JUnitSuite { /* A simple request handler that just echos back the response */ def processRequest(channel: RequestChannel) { val request = channel.receiveRequest - val id = request.buffer.getShort - val send = new BoundedByteBufferSend(request.buffer.slice) + val byteBuffer = ByteBuffer.allocate(request.requestObj.sizeInBytes) + request.requestObj.writeTo(byteBuffer) + byteBuffer.rewind() + val send = new BoundedByteBufferSend(byteBuffer) channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) } @@ -80,7 +82,7 @@ class SocketServerTest extends JUnitSuite { val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ack = SyncProducerConfig.DefaultRequiredAcks val emptyRequest = - new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]()) + new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes) emptyRequest.writeTo(byteBuffer) diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 81b2736..5530775 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -78,7 +78,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ack: Short = 1 - val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]()) + val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) + val producer = new SyncProducer(new SyncProducerConfig(props)) val response = producer.send(emptyRequest) Assert.assertTrue(response != null) -- 1.7.12.4 (Apple Git-37) From 595a6878cbc19d7cfd7a6bab279f5f359becf542 Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Fri, 22 Feb 2013 15:56:49 -0800 Subject: [PATCH 2/4] dont rewind buffer --- core/src/main/scala/kafka/network/RequestChannel.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index b21d9df..209fdfa 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -46,7 +46,6 @@ object RequestChannel extends Logging { @volatile var responseCompleteTimeMs = -1L val requestId = buffer.getShort() val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) - buffer.rewind() buffer = null trace("Received request : %s".format(requestObj)) -- 1.7.12.4 (Apple Git-37) From f189caf408b39d1078e88f929a96cb2de3d327e1 Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Mon, 25 Feb 2013 10:50:47 -0800 Subject: [PATCH 3/4] Fixing two new test cases --- core/src/test/scala/unit/kafka/network/SocketServerTest.scala | 2 +- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 4c9cd7e..b347e66 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -112,7 +112,7 @@ class SocketServerTest extends JUnitSuite { val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ack: Short = 0 val emptyRequest = - new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]()) + new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes) emptyRequest.writeTo(byteBuffer) diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 5530775..b5ee31d 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -193,7 +193,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ack: Short = 0 - val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]()) + val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) val producer = new SyncProducer(new SyncProducerConfig(props)) val response = producer.send(emptyRequest) Assert.assertTrue(response == null) -- 1.7.12.4 (Apple Git-37) From 151e4f91f8f4b7a1a3a8b8c6586189ba7c56dc0e Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Tue, 26 Feb 2013 00:28:17 -0800 Subject: [PATCH 4/4] Simplyfy toString --- core/src/main/scala/kafka/api/ProducerRequest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 8842ce9..916fb59 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -131,7 +131,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, producerRequest.append("; ClientId: " + clientId) producerRequest.append("; RequiredAcks: " + requiredAcks) producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms") - producerRequest.append("; TopicAndPartition: " + topicPartitionMessageSizeMap.map(r => r._1 -> r._2).toMap.mkString(",")) + producerRequest.append("; TopicAndPartition: " + topicPartitionMessageSizeMap.mkString(",")) producerRequest.toString() } -- 1.7.12.4 (Apple Git-37)