From 854b872c623e98c2316c5ff8494d9f054d4928ff Mon Sep 17 00:00:00 2001 From: Francois Saint-Jacques Date: Fri, 1 Nov 2013 17:11:09 -0400 Subject: [PATCH] removes debug/trace logging in critical path After investigating high CPU usage on some producers in production, we found out that a lot of time was passed in string construction for logging of DEBUG and TRACE level. This patch removes most of the logging calls, on our systems it cuts CPU usage down to half of what it used to be. --- .../scala/kafka/producer/BrokerPartitionInfo.scala | 4 ---- core/src/main/scala/kafka/producer/Producer.scala | 3 --- .../main/scala/kafka/producer/SyncProducer.scala | 4 ---- .../kafka/producer/async/DefaultEventHandler.scala | 26 +--------------------- .../kafka/producer/async/ProducerSendThread.scala | 8 ------- 5 files changed, 1 insertion(+), 44 deletions(-) diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index 13a8aa6..5972c77 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -38,7 +38,6 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, * sequence if no brokers are available. */ def getBrokerPartitionInfo(topic: String, correlationId: Int): Seq[PartitionAndLeader] = { - debug("Getting broker partition info for topic %s".format(topic)) // check if the cache has metadata for this topic val topicMetadata = topicPartitionInfo.get(topic) val metadata: TopicMetadata = @@ -64,10 +63,8 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, partitionMetadata.map { m => m.leader match { case Some(leader) => - debug("Partition [%s,%d] has leader %d".format(topic, m.partitionId, leader.id)) new PartitionAndLeader(topic, m.partitionId, Some(leader.id)) case None => - debug("Partition [%s,%d] does not have a leader yet".format(topic, m.partitionId)) new PartitionAndLeader(topic, m.partitionId, None) } }.sortWith((s, t) => s.partitionId < t.partitionId) @@ -83,7 +80,6 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, topicsMetadata = topicMetadataResponse.topicsMetadata // throw partition specific exception topicsMetadata.foreach(tmd =>{ - trace("Metadata for topic %s is %s".format(tmd.topic, tmd)) if(tmd.errorCode == ErrorMapping.NoError) { topicPartitionInfo.put(tmd.topic, tmd) } else diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 4798481..f4fc8c8 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -110,9 +110,6 @@ class Producer[K,V](val config: ProducerConfig, producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark() producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark() throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString) - }else { - trace("Added to send queue an event: " + message.toString) - trace("Remaining queue size: " + queue.remainingCapacity) } } } diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 419156e..a4dbf9a 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -50,11 +50,9 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { */ if (logger.isDebugEnabled) { val buffer = new BoundedByteBufferSend(request).buffer - trace("verifying sendbuffer of size " + buffer.limit) val requestTypeId = buffer.getShort() if(requestTypeId == RequestKeys.ProduceKey) { val request = ProducerRequest.readFrom(buffer) - trace(request.toString) } } } @@ -72,8 +70,6 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { blockingChannel.send(request) if(readResponse) response = blockingChannel.receive() - else - trace("Skipping reading response") } catch { case e: java.io.IOException => // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index d8ac915..9163f67 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -59,7 +59,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig, var outstandingProduceRequests = serializedData var remainingRetries = config.messageSendMaxRetries + 1 val correlationIdStart = correlationId.get() - debug("Handling %d events".format(events.size)) while (remainingRetries > 0 && outstandingProduceRequests.size > 0) { topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) if (topicMetadataRefreshInterval >= 0 && @@ -98,9 +97,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig, val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]] try { for ((brokerid, messagesPerBrokerMap) <- partitionedData) { - if (logger.isTraceEnabled) - messagesPerBrokerMap.foreach(partitionAndEvent => - trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap) val failedTopicPartitions = send(brokerid, messageSetPerBroker) @@ -184,8 +180,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig, private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = { val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement) - debug("Broker partitions registered for topic: %s are %s" - .format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(","))) val totalNumPartitions = topicPartitionsList.length if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition key = " + m.key) @@ -228,7 +222,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig, if(partition < 0 || partition >= numPartitions) throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic + "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]") - trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition)) partition } @@ -250,19 +243,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig, var failedTopicPartitions = Seq.empty[TopicAndPartition] try { val syncProducer = producerPool.getProducer(brokerId) - debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d" - .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) val response = syncProducer.send(producerRequest) - debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d" - .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) if(response != null) { if (response.status.size != producerRequest.data.size) throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest)) - if (logger.isTraceEnabled) { - val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError) - successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => - trace("Successfully sent message: %s".format(if(message.message.isNull) null else Utils.readString(message.message.payload))))) - } val failedPartitionsAndStatus = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1) if(failedTopicPartitions.size > 0) { @@ -303,23 +287,15 @@ class DefaultEventHandler[K,V](config: ProducerConfig, ( topicAndPartition, config.compressionCodec match { case NoCompressionCodec => - debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition)) new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*) case _ => config.compressedTopics.size match { case 0 => - debug("Sending %d messages with compression codec %d to %s" - .format(messages.size, config.compressionCodec.codec, topicAndPartition)) new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*) case _ => if(config.compressedTopics.contains(topicAndPartition.topic)) { - debug("Sending %d messages with compression codec %d to %s" - .format(messages.size, config.compressionCodec.codec, topicAndPartition)) new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*) - } - else { - debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s" - .format(messages.size, topicAndPartition, config.compressedTopics.toString)) + } else { new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*) } } diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 42e9c74..35abab9 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -70,8 +70,6 @@ class ProducerSendThread[K,V](val threadName: String, // returns a null object val expired = currentQueueItem == null if(currentQueueItem != null) { - trace("Dequeued item for topic %s, partition key: %s, data: %s" - .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message)) events += currentQueueItem } @@ -79,11 +77,6 @@ class ProducerSendThread[K,V](val threadName: String, full = events.size >= batchSize if(full || expired) { - if(expired) - debug(elapsed + " ms elapsed. Queue time reached. Sending..") - if(full) - debug("Batch full. Sending..") - // if either queue time has reached or batch size has reached, dispatch to event handler tryToHandle(events) lastSend = SystemTime.milliseconds events = new ArrayBuffer[KeyedMessage[K,V]] @@ -99,7 +92,6 @@ class ProducerSendThread[K,V](val threadName: String, def tryToHandle(events: Seq[KeyedMessage[K,V]]) { val size = events.size try { - debug("Handling " + size + " events") if(size > 0) handler.handle(events) }catch { -- 1.8.3.4 (Apple Git-47)