diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 719beb5..cb88037 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -129,7 +129,7 @@ object ConsoleConsumer extends Logging {
     CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
     val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
     if (topicOrFilterOpt.size != 1) {
-      error("Exactly one of whitelist/blacklist/topic is required.")
+      fatal("Exactly one of whitelist/blacklist/topic is required.")
       parser.printHelpOn(System.err)
       System.exit(1)
     }
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 71ae640..f265366 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -144,7 +144,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
   }
 
   def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) {
-    debug("adding partitions with error %s".format(partitionList))
+    debug("Adding partitions with error %s".format(partitionList))
     lock.lock()
     try {
       if (partitionMap != null) {
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index b80c0b0..978f4e9 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -78,7 +78,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
         val cdcFetchOffset = currentDataChunk.fetchOffset
         val ctiConsumeOffset = currentTopicInfo.getConsumeOffset
         if (ctiConsumeOffset < cdcFetchOffset) {
-          error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
+          error("Consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
             .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
           currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
         }
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 64b702b..6d3dc8d 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -30,8 +30,8 @@ class PartitionTopicInfo(val topic: String,
                          private val fetchSize: AtomicInteger,
                          private val clientId: String) extends Logging {
 
-  debug("initial consumer offset of " + this + " is " + consumedOffset.get)
-  debug("initial fetch offset of " + this + " is " + fetchedOffset.get)
+  debug("Initial consumer offset of " + this + " is " + consumedOffset.get)
+  debug("Initial fetch offset of " + this + " is " + fetchedOffset.get)
 
   private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
 
@@ -41,12 +41,12 @@ class PartitionTopicInfo(val topic: String,
 
   def resetConsumeOffset(newConsumeOffset: Long) = {
     consumedOffset.set(newConsumeOffset)
-    debug("reset consume offset of " + this + " to " + newConsumeOffset)
+    debug("Reset consume offset of " + this + " to " + newConsumeOffset)
   }
 
   def resetFetchOffset(newFetchOffset: Long) = {
     fetchedOffset.set(newFetchOffset)
-    debug("reset fetch offset of ( %s ) to %d".format(this, newFetchOffset))
+    debug("Reset fetch offset of ( %s ) to %d".format(this, newFetchOffset))
   }
 
   /**
@@ -59,7 +59,7 @@ class PartitionTopicInfo(val topic: String,
       trace("Updating fetch offset = " + fetchedOffset.get + " to " + next)
       chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
       fetchedOffset.set(next)
-      debug("updated fetch offset of (%s) to %d".format(this, next))
+      debug("Updated fetch offset of (%s) to %d".format(this, next))
       consumerTopicStats.getConsumerTopicStats(topic).byteRate.mark(size)
       consumerTopicStats.getConsumerAllTopicStats().byteRate.mark(size)
     }
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 1c28328..4b3efe0 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -73,7 +73,7 @@ class SimpleConsumer(val host: String,
         response = blockingChannel.receive()
       } catch {
         case e : java.io.IOException =>
-          info("Reconnect due to socket error: ", e)
+          info("Reconnect due to socket error\n%s : %s".format(e.getClass, e.getMessage))
           // retry once
           try {
             reconnect()
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index c8e8406..cd7cda3 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -68,7 +68,7 @@ private[kafka] object TopicCount extends Logging {
       }
     } catch {
       case e =>
-        error("error parsing consumer json string " + topicCountString, e)
+        error("Error parsing consumer json string " + topicCountString, e)
         throw e
     }
 
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index a2ea5a9..5f27282 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -114,7 +114,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   createFetcher()
   if (config.autoCommitEnable) {
     scheduler.startup
-    info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
+    info("Starting auto committer every " + config.autoCommitIntervalMs + " ms")
     scheduler.scheduleWithRate(autoCommit, "Kafka-consumer-autocommit-", config.autoCommitIntervalMs,
       config.autoCommitIntervalMs, false)
   }
@@ -175,7 +175,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         }
       } catch {
         case e =>
-          fatal("error during consumer connector shutdown", e)
+          error("Error during consumer connector shutdown", e)
       }
       info("ZKConsumerConnector shut down completed")
     }
@@ -183,7 +183,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
       : Map[String,List[KafkaStream[K,V]]] = {
-    debug("entering consume ")
+    debug("entering consume")
     if (topicCountMap == null)
       throw new RuntimeException("topicCountMap is null")
 
@@ -212,12 +212,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry
 
   private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
-    info("begin registering consumer " + consumerIdString + " in ZK")
+    info("Begin registering consumer " + consumerIdString + " in ZK")
     val consumerRegistrationInfo =
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false)
                              ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern), valueInQuotes = true))
     createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
-    info("end registering consumer " + consumerIdString + " in ZK")
+    info("End registering consumer " + consumerIdString + " in ZK")
   }
 
   private def sendShutdownToAllQueues() = {
@@ -230,14 +230,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   }
 
   def autoCommit() {
-    trace("auto committing")
+    trace("Auto committing")
     try {
       commitOffsets()
     }
     catch {
       case t: Throwable =>
       // log it and let it go
-        error("exception during autoCommit: ", t)
+        error("Exception during autoCommit: ", t)
     }
   }
 
@@ -257,7 +257,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           } catch {
             case t: Throwable =>
               // log it and let it go
-              warn("exception during commitOffsets",  t)
+              warn("Exception during commitOffsets",  t)
           }
           debug("Committed offset " + newOffset + " for topic " + info)
         }
@@ -312,7 +312,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     private val cond = lock.newCondition()
     private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
       override def run() {
-        info("starting watcher executor thread for consumer " + consumerIdString)
+        info("Starting watcher executor thread for consumer " + consumerIdString)
         var doRebalance = false
         while (!isShuttingDown.get) {
           try {
@@ -328,10 +328,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             if (doRebalance)
               syncedRebalance
           } catch {
-            case t => error("error during syncedRebalance", t)
+            case t => error("Error during syncedRebalance", t)
           }
         }
-        info("stopping watcher executor thread for consumer " + consumerIdString)
+        info("Stopping watcher executor thread for consumer " + consumerIdString)
       }
     }
     watcherExecutorThread.start()
@@ -370,7 +370,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     def syncedRebalance() {
       rebalanceLock synchronized {
         for (i <- 0 until config.rebalanceMaxRetries) {
-          info("begin rebalancing consumer " + consumerIdString + " try #" + i)
+          info("Begin rebalancing consumer " + consumerIdString + " try #" + i)
           var done = false
           val cluster = getCluster(zkClient)
           try {
@@ -381,9 +381,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                * For example, a ZK node can disappear between the time we get all children and the time we try to get
                * the value of a child. Just let this go since another rebalance will be triggered.
                **/
-              info("exception during rebalance ", e)
+              info("Exception during rebalance ", e)
           }
-          info("end rebalancing consumer " + consumerIdString + " try #" + i)
+          info("End rebalancing consumer " + consumerIdString + " try #" + i)
           if (done) {
             return
           } else {
@@ -408,7 +408,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.
         // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers
         // are up.
-        warn("no brokers found when trying to rebalance.")
+        warn("No brokers found when trying to rebalance.")
         zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener)
         true
       }
@@ -570,7 +570,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         } catch {
           case e: ZkNodeExistsException =>
             // The node hasn't been deleted by the original owner. So wait a bit and retry.
-            info("waiting for the partition ownership to be deleted: " + partition)
+            info("Waiting for the partition ownership to be deleted: " + partition)
             false
           case e2 => throw e2
         }
@@ -677,7 +677,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       val topic = e._1
       val streams = e._2.map(_._2._2).toList
       topicStreamsMap += (topic -> streams)
-      debug("adding topic %s and %d streams to map.".format(topic, streams.size))
+      debug("Adding topic %s and %d streams to map.".format(topic, streams.size))
     })
 
     // listener to consumer and partition changes
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index df83baa..b2e150f 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -69,14 +69,14 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
         try {
           if (zkClient != null) {
             val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList
-            debug("all topics: %s".format(latestTopics))
+            debug("All topics: %s".format(latestTopics))
 
             eventHandler.handleTopicEvent(latestTopics)
           }
         }
         catch {
           case e =>
-            error("error in handling child changes", e)
+            error("Error in handling child changes", e)
         }
       }
     }
diff --git a/core/src/main/scala/kafka/network/Transmission.scala b/core/src/main/scala/kafka/network/Transmission.scala
index 2827103..52efe10 100644
--- a/core/src/main/scala/kafka/network/Transmission.scala
+++ b/core/src/main/scala/kafka/network/Transmission.scala
@@ -113,7 +113,7 @@ abstract class MultiSend[S <: Send](val sends: List[S]) extends Send {
   def complete: Boolean = {
     if (current == Nil) {
       if (totalWritten != expectedBytesToWrite)
-        error("mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten)
+        error("Mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten)
       true
     } else {
       false
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 306f200..f93095b 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -50,7 +50,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
      */
     if (logger.isDebugEnabled) {
       val buffer = new BoundedByteBufferSend(request).buffer
-      trace("verifying sendbuffer of size " + buffer.limit)
+      trace("Verifying sendbuffer of size " + buffer.limit)
       val requestTypeId = buffer.getShort()
       if(requestTypeId == RequestKeys.ProduceKey) {
         val request = ProducerRequest.readFrom(buffer)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 7663fac..80d471d 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -92,7 +92,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
     val partitionsWithError = new mutable.HashSet[TopicAndPartition]
     var response: FetchResponse = null
     try {
-      trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
+      trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
       response = simpleConsumer.fetch(fetchRequest)
     } catch {
       case t =>
@@ -144,17 +144,17 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
                   try {
                     val newOffset = handleOffsetOutOfRange(topicAndPartition)
                     partitionMap.put(topicAndPartition, newOffset)
-                    warn("current offset %d for partition [%s,%d] out of range; reset offset to %d"
+                    warn("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
                       .format(currentOffset.get, topic, partitionId, newOffset))
                   } catch {
                     case e =>
-                      warn("error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
+                      warn("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
                       partitionsWithError += topicAndPartition
                   }
                 case _ =>
                   if (isRunning.get) {
-                    warn("error for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id),
-                      ErrorMapping.exceptionFor(partitionData.error))
+                    warn("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
+                      ErrorMapping.exceptionFor(partitionData.error).getClass))
                     partitionsWithError += topicAndPartition
                   }
               }
@@ -166,7 +166,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
     }
 
     if(partitionsWithError.size > 0) {
-      debug("handling partitions with error for %s".format(partitionsWithError))
+      debug("Handling partitions with error for %s".format(partitionsWithError))
       handlePartitionsWithErrors(partitionsWithError)
     }
   }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 208e3ef..3ee8024 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -76,7 +76,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     } catch {
       case e: Throwable =>
         request.requestObj.handleError(e, requestChannel, request)
-        error("error when handling request %s".format(request.requestObj), e)
+        error("Error when handling request %s".format(request.requestObj), e)
     } finally
       request.apiLocalCompleteTimeMs = SystemTime.milliseconds
   }
@@ -349,9 +349,8 @@ class KafkaApis(val requestChannel: RequestChannel,
             case t =>
               BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
               BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
-              error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d"
-                    .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId),
-                    t)
+              error(("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d"
+                    .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId), t)
               new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
           }
         (TopicAndPartition(topic, partition), partitionData)
@@ -507,7 +506,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             topicsMetadata += topicMetadata
           }
         case _ =>
-          debug("Error while fetching topic metadata for topic %s due to %s ".format(topicMetadata.topic,
+          error("Error while fetching topic metadata for topic %s due to %s ".format(topicMetadata.topic,
             ErrorMapping.exceptionFor(topicMetadata.errorCode).getClass.getName))
           topicsMetadata += topicMetadata
       }
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index fed0b86..834c657 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -41,7 +41,7 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha
         trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
         apis.handle(req)
       } catch {
-        case e: Throwable => error("Exception when handling request")
+        case e: Throwable => error("Exception when handling request", e)
       }
     }
   }
@@ -63,12 +63,12 @@ class KafkaRequestHandlerPool(val brokerId: Int,
   }
 
   def shutdown() {
-    info("shutting down")
+    info("Shutting down")
     for(handler <- runnables)
       handler.shutdown
     for(thread <- threads)
       thread.join
-    info("shutted down completely")
+    info("Shutted down completely")
   }
 }
 
diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
index 0e6c656..0941c9b 100644
--- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
+++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
@@ -35,7 +35,7 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging {
 
    def startup() {
      /* start client */
-     info("connecting to ZK: " + config.zkConnect)
+     info("Connecting to ZK: " + config.zkConnect)
      zkClient = KafkaZookeeperClient.getZookeeperClient(config)
      zkClient.subscribeStateChanges(new SessionExpireListener)
      registerBrokerInZk()
@@ -70,9 +70,9 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging {
      */
     @throws(classOf[Exception])
     def handleNewSession() {
-      info("re-registering broker info in ZK for broker " + config.brokerId)
+      info("Re-registering broker info in ZK for broker " + config.brokerId)
       registerBrokerInZk()
-      info("done re-registering broker")
+      info("Done re-registering broker")
       info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
     }
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 351dbba..f776b7d 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -28,8 +28,8 @@ class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val r
   }
 
   def shutdown() {
-    info("shutting down")
+    info("Shutting down")
     closeAllFetchers()
-    info("shutdown completed")
+    info("Shutdown completed")
   }  
 }
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index 63519e1..957ac13 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -87,7 +87,7 @@ object ImportZkOffsets extends Logging {
       val tokens = s.split(":")
       
       partOffsetsMap += tokens(0) -> tokens(1)
-      debug("adding node path [" + s + "]")
+      debug("Adding node path [" + s + "]")
       
       s = br.readLine()
     }
@@ -100,7 +100,7 @@ object ImportZkOffsets extends Logging {
     var partitions: List[String] = Nil
 
     for ((partition, offset) <- partitionOffsets) {
-      debug("updating [" + partition + "] with offset [" + offset + "]")
+      debug("Updating [" + partition + "] with offset [" + offset + "]")
       
       try {
         ZkUtils.updatePersistentPath(zkClient, partition, offset.toString)
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index c747bfb..6fb545a 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -181,6 +181,7 @@ object MirrorMaker extends Logging {
 
     private val shutdownLatch = new CountDownLatch(1)
     private val threadName = "mirrormaker-" + threadId
+    this.logIdent = "[%s] ".format(threadName)
 
     this.setName(threadName)
 
@@ -204,10 +205,10 @@ object MirrorMaker extends Logging {
         }
       } catch {
         case e =>
-          fatal("%s stream unexpectedly exited.", e)
+          fatal("Stream unexpectedly exited.", e)
       } finally {
         shutdownLatch.countDown()
-        info("Stopped thread %s.".format(threadName))
+        info("Stopped thread.")
       }
     }
 
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index 814d61a..2eec997 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -172,11 +172,11 @@ object ReplayLogProducer extends Logging {
           }
         }
       }catch {
-        case e: ConsumerTimeoutException => error("consumer thread timing out", e)
+        case e: ConsumerTimeoutException => error("Consumer thread timing out")
       }
       info("Sent " + messageCount + " messages")
       shutdownLatch.countDown
-      info("thread finished execution !" )
+      info("Thread finished execution !" )
     }
 
     def shutdown() {
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 3cfa384..af3a381 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -206,7 +206,7 @@ object SimpleConsumerShell extends Logging {
               println("Terminating. Reached the end of partition (%s, %d) at offset %d".format(topic, partitionId, offset))
               return
             }
-            debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
+            debug("Multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
             for(messageAndOffset <- messageSet if(numMessagesConsumed < maxMessages)) {
               try {
                 offset = messageAndOffset.nextOffset
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index e83eb5f..6264dc4 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -69,7 +69,7 @@ object Utils extends Logging {
         catch {
           case t =>
             // log any error and the stack trace
-            error("error in loggedRunnable", t)
+            error("Error in loggedRunnable", t)
         }
       }
     }
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index d53d511..22dd4e4 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -264,7 +264,7 @@ object ZkUtils extends Logging {
           case e2 => throw e2
         }
         if (storedData == null || storedData != data) {
-          info("conflict in " + path + " data: " + data + " stored data: " + storedData)
+          info("Conflict in " + path + " data: " + data + " stored data: " + storedData)
           throw e
         } else {
           // otherwise, the creation succeeded, return normally
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index fcfc583..de87835 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -161,7 +161,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     zkConsumerConnector1.shutdown
     zkConsumerConnector2.shutdown
     zkConsumerConnector3.shutdown
-    info("all consumer connectors stopped")
+    info("All consumer connectors stopped")
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
@@ -240,7 +240,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     zkConsumerConnector1.shutdown
     zkConsumerConnector2.shutdown
     zkConsumerConnector3.shutdown
-    info("all consumer connectors stopped")
+    info("All consumer connectors stopped")
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
@@ -296,7 +296,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
           assertTrue(iterator.hasNext())
           val message = iterator.next().message
           receivedMessages ::= message
-          debug("received message: " + message)
+          debug("Received message: " + message)
         }
       }
     }
@@ -398,7 +398,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
           assertTrue(iterator.hasNext)
           val message = iterator.next.message
           messages ::= message
-          debug("received message: " + message)
+          debug("Received message: " + message)
         }
       }
     }
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 2317760..2a09e4a 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -102,7 +102,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
       }
     } catch {
       case e: ConsumerTimeoutException => 
-        info("consumer timed out after receiving " + received + " messages.")
+        info("Consumer timed out after receiving " + received + " messages.")
     } finally {
       producer.close()
       consumerConnector.shutdown
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 9f243f0..f38977c 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -69,7 +69,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     assertEquals(sentMessages1.sorted, receivedMessages1.sorted)
 
     zkConsumerConnector1.shutdown
-    info("all consumer connectors stopped")
+    info("All consumer connectors stopped")
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
@@ -111,7 +111,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
           assertTrue(iterator.hasNext)
           val message = iterator.next.message
           messages ::= message
-          debug("received message: " + message)
+          debug("Received message: " + message)
         }
       }
     }
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index c4328f0..d629d7c 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -66,7 +66,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     // wait until leader is elected
     val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
     val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
-    debug("leader Epoc: " + leaderEpoch1)
+    debug("Leader Epoc: " + leaderEpoch1)
     debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
     assertTrue("Leader should get elected", leader1.isDefined)
     // NOTE: this is to avoid transient test failures
@@ -80,7 +80,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
       if(leader1.get == 0) None else leader1)
     val leaderEpoch2 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
     debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
-    debug("leader Epoc: " + leaderEpoch2)
+    debug("Leader Epoc: " + leaderEpoch2)
     assertEquals("Leader must move to broker 0", 0, leader2.getOrElse(-1))
     if(leader1.get == leader2.get)
       assertEquals("Second epoch value should be " + leaderEpoch1+1, leaderEpoch1+1, leaderEpoch2)
@@ -93,7 +93,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500,
       if(leader2.get == 1) None else leader2)
     val leaderEpoch3 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
-    debug("leader Epoc: " + leaderEpoch3)
+    debug("Leader Epoc: " + leaderEpoch3)
     debug("Leader is elected to be: %s".format(leader3.getOrElse(-1)))
     assertEquals("Leader must return to 1", 1, leader3.getOrElse(-1))
     if(leader2.get == leader3.get)
@@ -113,7 +113,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     // wait until leader is elected
     val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
     val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
-    debug("leader Epoc: " + leaderEpoch1)
+    debug("Leader Epoc: " + leaderEpoch1)
     debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
     assertTrue("Leader should get elected", leader1.isDefined)
     // NOTE: this is to avoid transient test failures
