From 3c2dbfba6715387826e4aaccc5c04e8613f7fb70 Mon Sep 17 00:00:00 2001
From: Abhishek Sharma <abhioncbr@yahoo.com>
Date: Fri, 19 Sep 2014 10:39:00 +0530
Subject: [PATCH 1/2] Jira-1591 - As per latest Guozhang Wang comments.

---
 core/src/main/scala/kafka/client/ClientUtils.scala |    4 ++--
 .../kafka/consumer/ConsumerFetcherManager.scala    |    4 ++--
 .../src/main/scala/kafka/consumer/TopicCount.scala |    2 +-
 .../consumer/ZookeeperConsumerConnector.scala      |    2 +-
 .../kafka/network/BoundedByteBufferReceive.scala   |    2 +-
 .../main/scala/kafka/network/SocketServer.scala    |    2 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |    2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |    4 ++--
 .../main/scala/kafka/server/MetadataCache.scala    |    2 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |    4 ++--
 10 files changed, 14 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index ebba87f..bd56a28 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -60,8 +60,8 @@ object ClientUtils extends Logging{
       }
       catch {
         case e: Throwable =>
-          warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
-            .format(correlationId, topics, shuffledBrokers(i).toString), e)
+          warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed. %s"
+            .format(correlationId, topics, shuffledBrokers(i).toString, e.toString()))
           t = e
       } finally {
         i = i + 1
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index b9e2bea..4420a89 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -85,7 +85,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
             if (!isRunning.get())
               throw t /* If this thread is stopped, propagate this exception to kill the thread. */
             else
-              warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
+              warn("Failed to find leader for %s. %s".format(noLeaderPartitionSet, t.toString()))
           }
       } finally {
         lock.unlock()
@@ -101,7 +101,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
           if (!isRunning.get())
             throw t /* If this thread is stopped, propagate this exception to kill the thread. */
           else {
-            warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t)
+            warn("Failed to add leader for partitions %s; will retry. %s".format(leaderForPartitionsMap.keySet.mkString(","), t.toString()))
             lock.lock()
             noLeaderPartitionSet ++= leaderForPartitionsMap.keySet
             lock.unlock()
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index 0954b3c..6fd13de 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -77,7 +77,7 @@ private[kafka] object TopicCount extends Logging {
       }
     } catch {
       case e: Throwable =>
-        error("error parsing consumer json string " + topicCountString, e)
+        error("error parsing consumer json string %s. %s".format(topicCountString, e.toString()))
         throw e
     }
 
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index fbc680f..5d6e40c 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -433,7 +433,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           }
           catch {
             case e: Exception =>
-              warn("Error while fetching offsets from %s:%d. Possible cause: %s".format(offsetsChannel.host, offsetsChannel.port, e.getMessage))
+              warn("Error while fetching offsets from %s:%d. %s".format(offsetsChannel.host, offsetsChannel.port, e.getMessage))
               offsetsChannel.disconnect()
               None // retry
           }
diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
index a442545..3a3e835 100644
--- a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
+++ b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
@@ -80,7 +80,7 @@ private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive
       buffer = ByteBuffer.allocate(size)
     } catch {
       case e: OutOfMemoryError =>
-        error("OOME with size " + size, e)
+        error("OOME with size %d. %s".format(size, e.toString()))
         throw e
       case e2: Throwable =>
         throw e2
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index d678990..5f975de 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -336,7 +336,7 @@ private[kafka] class Processor(val id: Int,
               throw new IllegalStateException("Unrecognized key state for processor thread.")
           } catch {
             case e: EOFException => {
-              info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
+              trace("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
               close(key)
             } case e: InvalidRequestException => {
               info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 2e9532e..295cbb0 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -98,7 +98,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
     } catch {
       case t: Throwable =>
         if (isRunning.get) {
-          warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))
+          warn("Error in fetch %s. %s".format(fetchRequest, t.toString()))
           partitionMapLock synchronized {
             partitionsWithError ++= partitionMap.keys
           }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c584b55..69b8924 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -287,8 +287,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case e: Throwable =>
           BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
           BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
-          error("Error processing ProducerRequest with correlation id %d from client %s on partition %s"
-            .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition), e)
+          error("Error processing ProducerRequest with correlation id %d from client %s on partition %s. %s"
+            .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition, e.toString()))
           new ProduceResult(topicAndPartition, e)
        }
     }
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index bf81a1a..5388cd0 100644
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -68,7 +68,7 @@ private[server] class MetadataCache {
                 new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
               } catch {
                 case e: Throwable =>
-                  debug("Error while fetching metadata for %s. Possible cause: %s".format(topicPartition, e.getMessage))
+                  debug("Error while fetching metadata for %s. %s".format(topicPartition, e.getMessage))
                   new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo,
                     ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
               }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 78b7514..254868a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -270,8 +270,8 @@ class ReplicaManager(val config: KafkaConfig,
             case t: Throwable =>
               BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
               BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
-              error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s"
-                .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage))
+              error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. %s"
+                .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage()))
               new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata)
           }
         (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo)
-- 
1.7.9.5


From 190de3ffc6e40d5280a3aacc0c4b43a2df03eeb3 Mon Sep 17 00:00:00 2001
From: Abhishek Sharma <abhioncbr@yahoo.com>
Date: Fri, 19 Sep 2014 11:33:38 +0530
Subject: [PATCH 2/2] Jira 1591 - As per latest Guozhang Wang comments.

---
 .../main/scala/kafka/producer/SyncProducer.scala   |    2 +-
 .../kafka/producer/async/DefaultEventHandler.scala |    4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 42c9503..54bbaf2 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -143,7 +143,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
       } catch {
         case e: Exception => {
           disconnect()
-          error("Producer connection to " + formatAddress(config.host, config.port) + " unsuccessful", e)
+          error("Producer connection to " + formatAddress(config.host, config.port) + " unsuccessful" + e.toString())
           throw e
         }
       }
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 33470ff..8fe6528 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -281,8 +281,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
         }
       } catch {
         case t: Throwable =>
-          warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"
-            .format(currentCorrelationId, brokerId, messagesPerTopic.map(_._1).mkString(",")), t)
+          warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s. %s"
+            .format(currentCorrelationId, brokerId, messagesPerTopic.map(_._1).mkString(","), t.toString()))
           messagesPerTopic.keys.toSeq
       }
     } else {
-- 
1.7.9.5

