Index: core/src/test/scala/unit/kafka/admin/AdminTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/admin/AdminTest.scala	(revision 1415417)
+++ core/src/test/scala/unit/kafka/admin/AdminTest.scala	(working copy)
@@ -22,7 +22,7 @@
 import kafka.zk.ZooKeeperTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.{ZkUtils, TestUtils}
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition}
 
 
 class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -170,7 +170,7 @@
       AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
       fail("shouldn't be able to create a topic already exists")
     } catch {
-      case e: AdministrationException => // this is good
+      case e: TopicExistsException => // this is good
       case e2 => throw e2
     }
   }
Index: core/src/main/scala/kafka/cluster/Broker.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Broker.scala	(revision 1415417)
+++ core/src/main/scala/kafka/cluster/Broker.scala	(working copy)
@@ -47,8 +47,10 @@
   
   override def toString(): String = new String("id:" + id + ",creatorId:" + creatorId + ",host:" + host + ",port:" + port)
 
-  def getZKString(): String = new String(creatorId + ":" + host + ":" + port)
+  def getZkString(): String = new String(creatorId + ":" + host + ":" + port)
 
+  def getConnectionString(): String = new String(host + ":" + port)
+
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(id)
     writeShortString(buffer, creatorId)
Index: core/src/main/scala/kafka/producer/ProducerPool.scala
===================================================================
--- core/src/main/scala/kafka/producer/ProducerPool.scala	(revision 1415417)
+++ core/src/main/scala/kafka/producer/ProducerPool.scala	(working copy)
@@ -41,13 +41,13 @@
   /**
    * Used in ClientUtils to send TopicMetadataRequest to a broker.
    */
-  def createSyncProducer(clientId: String, broker: Broker): SyncProducer = {
-    val props = new Properties()
-    props.put("host", broker.host)
-    props.put("port", broker.port.toString)
-    props.put("client.id", clientId)
-    new SyncProducer(new SyncProducerConfig(props))
-  }
+//  def createSyncProducer(clientId: String, broker: Broker): SyncProducer = {
+//    val props = new Properties()
+//    props.put("host", broker.host)
+//    props.put("port", broker.port.toString)
+//    props.put("client.id", clientId)
+//    new SyncProducer(new SyncProducerConfig(props))
+//  }
 }
 
 class ProducerPool(val config: ProducerConfig) extends Logging {
Index: core/src/main/scala/kafka/producer/SyncProducerConfig.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducerConfig.scala	(revision 1415417)
+++ core/src/main/scala/kafka/producer/SyncProducerConfig.scala	(working copy)
@@ -64,5 +64,5 @@
   val DefaultCorrelationId = -1
   val DefaultClientId = ""
   val DefaultRequiredAcks : Short = 0
-  val DefaultAckTimeoutMs = 500
+  val DefaultAckTimeoutMs = 1500
 }
\ No newline at end of file
Index: core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
===================================================================
--- core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala	(revision 1415417)
+++ core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala	(working copy)
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 package kafka.producer
 
 import collection.mutable.HashMap
@@ -72,7 +72,7 @@
    */
   def updateInfo(topics: Set[String]) {
     var topicsMetadata: Seq[TopicMetadata] = Nil
-    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, producerConfig.clientId, brokers)
+    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig)
     topicsMetadata = topicMetadataResponse.topicsMetadata
     // throw partition specific exception
     topicsMetadata.foreach(tmd =>{
Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(revision 1415417)
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(working copy)
@@ -48,7 +48,7 @@
     lock synchronized {
       val serializedData = serialize(events)
       serializedData.foreach{
-        keyed => 
+        keyed =>
           val dataSize = keyed.message.payloadSize
           producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
           producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize)
@@ -82,7 +82,7 @@
         try {
           for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
             if (logger.isTraceEnabled)
-              messagesPerBrokerMap.foreach(partitionAndEvent => 
+              messagesPerBrokerMap.foreach(partitionAndEvent =>
                 trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
             val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
 
@@ -173,7 +173,7 @@
     debug("Broker partitions registered for topic: %s are %s"
       .format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
     val totalNumPartitions = topicPartitionsList.length
-    if(totalNumPartitions == 0) 
+    if(totalNumPartitions == 0)
       throw new NoBrokersForPartitionException("Partition key = " + m.key)
     topicPartitionsList
   }
@@ -189,10 +189,10 @@
     if(numPartitions <= 0)
       throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions +
         "\n Valid values are > 0")
-    val partition = 
-      if(key == null) 
+    val partition =
+      if(key == null)
         Utils.abs(counter.getAndIncrement()) % numPartitions
-      else 
+      else
         partitioner.partition(key, numPartitions)
     if(partition < 0 || partition >= numPartitions)
       throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition +
@@ -217,11 +217,15 @@
       try {
         val syncProducer = producerPool.getProducer(brokerId)
         val response = syncProducer.send(producerRequest)
-        trace("Producer sent messages for topics %s to broker %d on %s:%d"
+        debug("Producer sent messages for topics %s to broker %d on %s:%d"
           .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
         if (response.status.size != producerRequest.data.size)
           throw new KafkaException("Incomplete response (%s) for producer request (%s)"
-                                           .format(response, producerRequest))
+            .format(response, producerRequest))
+        if (logger.isTraceEnabled) {
+          val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
+          successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => trace("%s".format(Utils.readString(message.message.payload)))))
+        }
         response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
           .map(partitionStatus => partitionStatus._1)
       } catch {
@@ -236,33 +240,33 @@
 
   private def groupMessagesToSet(messagesPerTopicAndPartition: Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
     /** enforce the compressed.topics config here.
-     *  If the compression codec is anything other than NoCompressionCodec,
-     *    Enable compression only for specified topics if any
-     *    If the list of compressed topics is empty, then enable the specified compression codec for all topics
-     *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
-     */
+      *  If the compression codec is anything other than NoCompressionCodec,
+      *    Enable compression only for specified topics if any
+      *    If the list of compressed topics is empty, then enable the specified compression codec for all topics
+      *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
+      */
 
     val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
       val rawMessages = messages.map(_.message)
       ( topicAndPartition,
         config.compressionCodec match {
           case NoCompressionCodec =>
-            trace("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
+            debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
             new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
           case _ =>
             config.compressedTopics.size match {
               case 0 =>
-                trace("Sending %d messages with compression codec %d to %s"
+                debug("Sending %d messages with compression codec %d to %s"
                   .format(messages.size, config.compressionCodec.codec, topicAndPartition))
                 new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
               case _ =>
                 if(config.compressedTopics.contains(topicAndPartition.topic)) {
-                  trace("Sending %d messages with compression codec %d to %s"
+                  debug("Sending %d messages with compression codec %d to %s"
                     .format(messages.size, config.compressionCodec.codec, topicAndPartition))
                   new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
                 }
                 else {
-                  trace("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
+                  debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
                     .format(messages.size, topicAndPartition, config.compressedTopics.toString))
                   new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
                 }
Index: core/src/main/scala/kafka/admin/AdminUtils.scala
===================================================================
--- core/src/main/scala/kafka/admin/AdminUtils.scala	(revision 1415417)
+++ core/src/main/scala/kafka/admin/AdminUtils.scala	(working copy)
@@ -25,7 +25,8 @@
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection._
 import scala.collection.mutable
-import kafka.common.{BrokerNotAvailableException, LeaderNotAvailableException, ReplicaNotAvailableException, ErrorMapping}
+import kafka.common._
+import scala.Some
 
 object AdminUtils extends Logging {
   val rand = new Random
@@ -82,7 +83,7 @@
       ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionMap)
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
     } catch {
-      case e: ZkNodeExistsException => throw new AdministrationException("topic %s already exists".format(topic))
+      case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
       case e2 => throw new AdministrationException(e2.toString)
     }
   }
Index: core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala	(revision 1415417)
+++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala	(working copy)
@@ -176,6 +176,7 @@
       }
     })
 
+    var numMessages = 0
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
     formatter.init(formatterArgs)
     try {
@@ -188,6 +189,7 @@
       for(messageAndTopic <- iter) {
         try {
           formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
+          numMessages += 1
         } catch {
           case e =>
             if (skipMessageOnError)
@@ -198,6 +200,7 @@
         if(System.out.checkError()) {
           // This means no one is listening to our output stream any more, time to shutdown
           System.err.println("Unable to write to standard out, closing consumer.")
+          System.err.println("Consumed %d messages".format(numMessages))
           formatter.close()
           connector.shutdown()
           System.exit(1)
@@ -206,6 +209,7 @@
     } catch {
       case e => error("Error processing message, stopping consumer: ", e)
     }
+    System.out.println("Consumed %d messages".format(numMessages))
     System.out.flush()
     formatter.close()
     connector.shutdown()
Index: core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala	(revision 1415417)
+++ core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala	(working copy)
@@ -54,7 +54,7 @@
         try {
           trace("Partitions without leader %s".format(noLeaderPartitionSet))
           val brokers = getAllBrokersInCluster(zkClient)
-          val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, config.clientId, brokers).topicsMetadata
+          val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers, config.clientId).topicsMetadata
           val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
           topicsMetadata.foreach(
             tmd => {
Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(revision 1415417)
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(working copy)
@@ -402,7 +402,7 @@
       val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
       val brokers = getAllBrokersInCluster(zkClient)
-      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, config.clientId, brokers).topicsMetadata
+      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers, config.clientId).topicsMetadata
       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
       val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
       topicsMetadata.foreach(m =>{
Index: core/src/main/scala/kafka/utils/ZkUtils.scala
===================================================================
--- core/src/main/scala/kafka/utils/ZkUtils.scala	(revision 1415417)
+++ core/src/main/scala/kafka/utils/ZkUtils.scala	(working copy)
@@ -184,7 +184,7 @@
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val broker = new Broker(id, creator, host, port)
     try {
-      createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
+      createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZkString)
     } catch {
       case e: ZkNodeExistsException =>
         throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")
Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
===================================================================
--- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala	(revision 1415417)
+++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala	(working copy)
@@ -125,7 +125,7 @@
     // getting topic metadata
     info("Getting topic metatdata...")
     val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
-    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), clientId, metadataTargetBrokers).topicsMetadata
+    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId).topicsMetadata
     if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
       System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
       System.exit(1)
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1415417)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -434,9 +434,13 @@
             try {
               /* check if auto creation of topics is turned on */
               if (config.autoCreateTopics) {
-                CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
-                info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-                             .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
+                try {
+                  CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
+                  info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+                               .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
+                } catch {
+                  case e: TopicExistsException => // let it go, possibly another broker created this topic
+                }
                 val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicAndMetadata.topic, zkClient)
                 topicsMetadata += newTopicMetadata
                 newTopicMetadata.errorCode match {
Index: core/src/main/scala/kafka/client/ClientUtils.scala
===================================================================
--- core/src/main/scala/kafka/client/ClientUtils.scala	(revision 1415417)
+++ core/src/main/scala/kafka/client/ClientUtils.scala	(working copy)
@@ -6,20 +6,28 @@
 import kafka.producer._
 import kafka.common.KafkaException
 import kafka.utils.{Utils, Logging}
+import java.util.Properties
 
 /**
  * Helper functions common to clients (producer, consumer, or admin)
  */
 object ClientUtils extends Logging{
 
-  def fetchTopicMetadata(topics: Set[String], clientId: String, brokers: Seq[Broker]): TopicMetadataResponse = {
+  /**
+   * Used by the producer to send a metadata request since it has access to the ProducerConfig
+   * @param topics The topics for which the metadata needs to be fetched
+   * @param brokers The brokers in the cluster as configured on the producer through broker.list
+   * @param producerConfig The producer's config
+   * @return topic metadata response
+   */
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig): TopicMetadataResponse = {
     var fetchMetaDataSucceeded: Boolean = false
     var i: Int = 0
     val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
     var topicMetadataResponse: TopicMetadataResponse = null
     var t: Throwable = null
     while(i < brokers.size && !fetchMetaDataSucceeded) {
-      val producer: SyncProducer = ProducerPool.createSyncProducer(clientId + "-FetchTopicMetadata", brokers(i))
+      val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, brokers(i))
       info("Fetching metadata for topic %s".format(topics))
       try {
         topicMetadataResponse = producer.send(topicMetadataRequest)
@@ -39,8 +47,23 @@
     }
     return topicMetadataResponse
   }
-  
+
   /**
+   * Used by a non-producer client to send a metadata request
+   * @param topics The topics for which the metadata needs to be fetched
+   * @param brokers The brokers in the cluster as configured on the client
+   * @param clientId The client's identifier
+   * @return topic metadata response
+   */
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String): TopicMetadataResponse = {
+    val props = new Properties()
+    props.put("broker.list", brokers.map(_.getConnectionString()).mkString(","))
+    props.put("clientid", clientId)
+    val producerConfig = new ProducerConfig(props)
+    fetchTopicMetadata(topics, brokers, producerConfig)
+  }
+
+  /**
    * Parse a list of broker urls in the form host1:port1, host2:port2, ... 
    */
   def parseBrokerList(brokerListStr: String): Seq[Broker] = {
Index: perf/src/main/scala/kafka/perf/ProducerPerformance.scala
===================================================================
--- perf/src/main/scala/kafka/perf/ProducerPerformance.scala	(revision 1415417)
+++ perf/src/main/scala/kafka/perf/ProducerPerformance.scala	(working copy)
@@ -126,11 +126,6 @@
       .withRequiredArg()
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(-1)
-    val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
-      .withRequiredArg
-      .describedAs("count")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
     val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
     val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
             "set, the csv metrics will be outputed here")
@@ -223,6 +218,7 @@
     private val threadIdLabel  = "ThreadID"
     private val topicLabel     = "Topic"
     private var leftPaddedSeqId : String = ""
+
     private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
       // Each thread gets a unique range of sequential no. for its ids.
       // Eg. 1000 msg in 10 threads => 100 msg per thread
@@ -246,12 +242,13 @@
 
     private def generateProducerData(topic: String, messageId: Long): (KeyedMessage[Long, Array[Byte]], Int) = {
       val msgSize = if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
-      val message = if(config.seqIdMode) {
-        val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
-        generateMessageWithSeqId(topic, seqId, msgSize)
-      } else {
-        new Array[Byte](msgSize)
-      }
+      val message =
+        if(config.seqIdMode) {
+          val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
+          generateMessageWithSeqId(topic, seqId, msgSize)
+        } else {
+          new Array[Byte](msgSize)
+        }
       (new KeyedMessage[Long, Array[Byte]](topic, messageId, message), message.length)
     }
 
