From 4f15bb133eb155734bd9e9b70ac7b40d6d8b0032 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 12 Aug 2014 16:37:05 -0700 Subject: [PATCH 1/2] KAFKA-1507. Using GetOffsetShell against non-existent topic creates the topic unintentionally. Added CreateTopicRequest to the protocol , removed create topic from TopicMetadataRequest. --- .../org/apache/kafka/clients/NetworkClient.java | 111 +++++++++++++++++-- .../kafka/clients/producer/KafkaProducer.java | 28 +++-- .../kafka/clients/producer/ProducerConfig.java | 23 +++- .../kafka/common/errors/TopicExistsException.java | 37 +++++++ .../errors/UnableToCreateTopicException.java | 37 +++++++ .../org/apache/kafka/common/protocol/ApiKeys.java | 5 +- .../org/apache/kafka/common/protocol/Errors.java | 8 +- .../org/apache/kafka/common/protocol/Protocol.java | 26 +++++ .../kafka/common/requests/CreateTopicRequest.java | 94 ++++++++++++++++ .../kafka/common/requests/CreateTopicResponse.java | 68 ++++++++++++ .../apache/kafka/clients/NetworkClientTest.java | 3 +- .../kafka/common/requests/RequestResponseTest.java | 13 +++ .../main/scala/kafka/api/CreateTopicRequest.scala | 122 +++++++++++++++++++++ .../main/scala/kafka/api/CreateTopicResponse.scala | 49 +++++++++ .../main/scala/kafka/api/CreateTopicResult.scala | 53 +++++++++ core/src/main/scala/kafka/api/RequestKeys.scala | 6 +- core/src/main/scala/kafka/client/ClientUtils.scala | 61 +++++++++-- .../src/main/scala/kafka/common/ErrorMapping.scala | 20 ++-- .../common/UnableToCreateTopicException.scala | 23 ++++ .../scala/kafka/javaapi/CreateTopicRequest.scala | 79 +++++++++++++ .../scala/kafka/javaapi/CreateTopicResponse.scala | 37 +++++++ .../scala/kafka/javaapi/CreateTopicResult.scala | 33 ++++++ .../scala/kafka/producer/BrokerPartitionInfo.scala | 25 +++-- .../main/scala/kafka/producer/ProducerConfig.scala | 24 ++++ .../main/scala/kafka/producer/SyncProducer.scala | 10 +- core/src/main/scala/kafka/server/KafkaApis.scala | 78 ++++++++----- .../kafka/api/ProducerFailureHandlingTest.scala | 1 + .../api/RequestResponseSerializationTest.scala | 11 +- .../consumer/ZookeeperConsumerConnectorTest.scala | 2 + .../unit/kafka/integration/TopicMetadataTest.scala | 26 ----- .../scala/unit/kafka/producer/ProducerTest.scala | 28 +++++ .../test/scala/unit/kafka/utils/TestUtils.scala | 6 +- 32 files changed, 1038 insertions(+), 109 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/TopicExistsException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/UnableToCreateTopicException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/CreateTopicRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/CreateTopicResponse.java create mode 100644 core/src/main/scala/kafka/api/CreateTopicRequest.scala create mode 100644 core/src/main/scala/kafka/api/CreateTopicResponse.scala create mode 100644 core/src/main/scala/kafka/api/CreateTopicResult.scala create mode 100644 core/src/main/scala/kafka/common/UnableToCreateTopicException.scala create mode 100644 core/src/main/scala/kafka/javaapi/CreateTopicRequest.scala create mode 100644 core/src/main/scala/kafka/javaapi/CreateTopicResponse.scala create mode 100644 core/src/main/scala/kafka/javaapi/CreateTopicResult.scala diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index eea270a..f2306a1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -18,6 +18,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Set; +import java.util.HashSet; +import java.util.Map; import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.common.Cluster; @@ -27,9 +29,12 @@ import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.CreateTopicRequest; +import org.apache.kafka.common.requests.CreateTopicResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.requests.ResponseHeader; @@ -53,6 +58,21 @@ public class NetworkClient implements KafkaClient { /* the current cluster metadata */ private final Metadata metadata; + /* auto create non existent topics */ + private boolean autoCreateTopics; + + /* topics to be created */ + private final Set nonExistentTopics; + + /* number of partitions for the nonExistentTopics to be created */ + private final int numPartitions; + + /* replication factor for the nonExistentTopics to be created */ + private final int replicationFactor; + + /* topic config */ + private final List topicConfig; + /* the state of each node's connection */ private final ClusterConnectionStates connectionStates; @@ -74,9 +94,12 @@ public class NetworkClient implements KafkaClient { /* the current correlation id to use when sending requests to servers */ private int correlation; - /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ + /* true if there is a metadata request that has been sent and for which we have not yet received a response */ private boolean metadataFetchInProgress; + /* true if there is a createTopic request that has been sent and for which we have not yet received a response */ + private boolean createTopicInProgress; + /* the last timestamp when no broker node is available to connect */ private long lastNoNodeAvailableMs; @@ -86,7 +109,11 @@ public class NetworkClient implements KafkaClient { int maxInFlightRequestsPerConnection, long reconnectBackoffMs, int socketSendBuffer, - int socketReceiveBuffer) { + int socketReceiveBuffer, + boolean autoCreateTopics, + int numPartitions, + int replicationFactor, + List topicConfig) { this.selector = selector; this.metadata = metadata; this.clientId = clientId; @@ -97,7 +124,13 @@ public class NetworkClient implements KafkaClient { this.correlation = 0; this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE); this.metadataFetchInProgress = false; + this.createTopicInProgress = false; this.lastNoNodeAvailableMs = 0; + this.autoCreateTopics = autoCreateTopics; + this.numPartitions = numPartitions; + this.replicationFactor = replicationFactor; + this.topicConfig = topicConfig; + this.nonExistentTopics = new HashSet(); } /** @@ -154,7 +187,6 @@ public class NetworkClient implements KafkaClient { @Override public List poll(List requests, long timeout, long now) { List sends = new ArrayList(); - for (int i = 0; i < requests.size(); i++) { ClientRequest request = requests.get(i); int nodeId = request.request().destination(); @@ -165,12 +197,17 @@ public class NetworkClient implements KafkaClient { sends.add(request.request()); } + // check createTopic list + if (!this.createTopicInProgress && nonExistentTopics.size() > 0) { + createTopics(sends, now); + } + // should we update our metadata? long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); long timeToNextReconnectAttempt = this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now; // if there is no node available to connect, back off refreshing metadata long metadataTimeout = Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt); - if (!this.metadataFetchInProgress && metadataTimeout == 0) + if (!this.createTopicInProgress && !this.metadataFetchInProgress && metadataTimeout == 0) maybeUpdateMetadata(sends, now); // do the I/O @@ -282,6 +319,8 @@ public class NetworkClient implements KafkaClient { correlate(req.request().header(), header); if (apiKey == ApiKeys.METADATA.id) { handleMetadataResponse(req.request().header(), body, now); + } else if(apiKey == ApiKeys.CREATE_TOPIC.id) { + handleCreateTopicResponse(req.request().header(), body); } else { // need to add body/header to response here responses.add(new ClientResponse(req, now, false, body)); @@ -295,10 +334,28 @@ public class NetworkClient implements KafkaClient { Cluster cluster = response.cluster(); // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being // created which means we will get errors and no nodes until it exists - if (cluster.nodes().size() > 0) + if (cluster.nodes().size() > 0) { this.metadata.update(cluster, now); - else + } else if(autoCreateTopics && response.errors().size() > 0) { + for (String topic: response.errors().keySet()) { + if(response.errors().get(topic) == Errors.UNKNOWN_TOPIC_OR_PARTITION) { + nonExistentTopics.add(topic); + } + } + } else { log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); + } + } + + private void handleCreateTopicResponse(RequestHeader header, Struct body) { + CreateTopicResponse response = new CreateTopicResponse(body); + Map topics = response.topics(); + for(String topic : topics.keySet()) { + if(topics.get(topic) == Errors.NONE || topics.get(topic) == Errors.TOPIC_EXISTS) { + nonExistentTopics.remove(topic); + } + } + this.createTopicInProgress = false; } /** @@ -315,6 +372,8 @@ public class NetworkClient implements KafkaClient { ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey()); if (requestKey == ApiKeys.METADATA) metadataFetchInProgress = false; + else if (requestKey == ApiKeys.CREATE_TOPIC) + createTopicInProgress = false; else responses.add(new ClientResponse(request, now, true, null)); } @@ -355,6 +414,16 @@ public class NetworkClient implements KafkaClient { } /** + * Create a createTopic request for the given topics + */ + private ClientRequest createTopicRequest(long now, int node, Set topics) { + CreateTopicRequest createTopicRequest = new CreateTopicRequest(new ArrayList(topics), numPartitions, + replicationFactor, topicConfig); + RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.CREATE_TOPIC), createTopicRequest.toStruct()); + return new ClientRequest(now, true, send, null); + } + + /** * Add a metadata request to the list of sends if we can make one */ private void maybeUpdateMetadata(List sends, long now) { @@ -365,7 +434,6 @@ public class NetworkClient implements KafkaClient { this.lastNoNodeAvailableMs = now; return; } - log.debug("Trying to send metadata request to node {}", node.id()); if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { Set topics = metadata.topics(); @@ -382,6 +450,31 @@ public class NetworkClient implements KafkaClient { } /** + * Add a createtopic request to the list of sends if we can make one + */ + private void createTopics(List sends, long now) { + Node node = this.leastLoadedNode(now); + if (node == null) { + log.debug("Give up sending createTopic request since no node is available"); + // mark the timestamp for no node available to connect + this.lastNoNodeAvailableMs = now; + return; + } + log.debug("Trying to send createTopic request to node {}", node.id()); + if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { + this.createTopicInProgress = true; + ClientRequest createTopicRequest = createTopicRequest(now, node.id(), nonExistentTopics); + log.debug("Sending createTopic request {} to node {}", createTopicRequest, node.id()); + sends.add(createTopicRequest.request()); + this.inFlightRequests.add(createTopicRequest); + } else if (connectionStates.canConnect(node.id(), now)) { + // we don't have a connection to this node right now, make one + log.debug("Init connection to node {} for sending createTopic request in the next iteration", node.id()); + initiateConnect(node, now); + } + } + + /** * Initiate a connection to the given node */ private void initiateConnect(Node node, long now) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f58b850..72141d2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -75,6 +75,10 @@ public class KafkaProducer implements Producer { private final CompressionType compressionType; private final Sensor errors; private final Time time; + private final Boolean autoCreateTopics; + private final int numPartitions; + private final int replicationFactor; + private final List topicConfig; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -113,6 +117,10 @@ public class KafkaProducer implements Producer { this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + this.autoCreateTopics = config.getBoolean(ProducerConfig.AUTO_CREATE_TOPICS_ENABLE); + this.numPartitions = config.getInt(ProducerConfig.TOPICS_NUM_PARTITIONS); + this.replicationFactor = config.getInt(ProducerConfig.TOPICS_REPLICATION_FACTOR); + this.topicConfig = config.getList(ProducerConfig.TOPICS_CONFIG); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, config.getLong(ProducerConfig.LINGER_MS_CONFIG), @@ -129,7 +137,11 @@ public class KafkaProducer implements Producer { config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), - config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG)); + config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), + this.autoCreateTopics, + this.numPartitions, + this.replicationFactor, + this.topicConfig); this.sender = new Sender(client, this.metadata, this.accumulator, @@ -180,14 +192,14 @@ public class KafkaProducer implements Producer { * sending the record. *

* If you want to simulate a simple blocking call you can do the following: - * + * *

      *   producer.send(new ProducerRecord("the-topic", "key, "value")).get();
      * 
*

* Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that * will be invoked when the request is complete. - * + * *

      *   ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
      *   producer.send(myRecord,
@@ -199,10 +211,10 @@ public class KafkaProducer implements Producer {
      *                     }
      *                 });
      * 
- * + * * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the * following example callback1 is guaranteed to execute before callback2: - * + * *
      * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
      * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
@@ -219,7 +231,7 @@ public class KafkaProducer implements Producer {
      * this case is to block the send call until the I/O thread catches up and more buffer space is available. However
      * in cases where non-blocking usage is desired the setting block.on.buffer.full=false will cause the
      * producer to instead throw an exception when buffer memory is exhausted.
-     * 
+     *
      * @param record The record to send
      * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
      *        indicates no callback)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f9de4af..ce73e70 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -53,6 +53,21 @@ public class ProducerConfig extends AbstractConfig {
     public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
     private static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any " + " partition leadership changes to proactively discover any new brokers or partitions.";
 
+    /** auto.create.topics.enable */
+    public static final String AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable";
+    private static final String AUTO_CREATE_TOPICS_ENABLE_DOC = "Enable auto creation of topic on the producer. If this is set to true then attempts to produce for a non-existent topic will automatically creates it with the default replication factor and number of partitions.";
+
+    /** topics.num.partitions */
+    public static final String TOPICS_NUM_PARTITIONS = "topics.num.partitions";
+    private static final String TOPICS_NUM_PARTITIONS_DOC = "The number of partitions per topic. If auto.create.topics.enable set to true producer uses this create.topics.num.partitions as per topic partitions number";
+    /** topics.replication.factor */
+    public static final String TOPICS_REPLICATION_FACTOR = "topics.replication.factor";
+    private static final String TOPICS_REPLICATION_FACTOR_DOC = "The replication factor per topic. If auto.create.topics.enable set to true producer uses create.topics.replication.factor as per topic replication factor";
+    /** topics.config */
+    public static final String TOPICS_CONFIG = "topics.config";
+    private static final String TOPICS_CONFIG_DOC = "A topic configuration override for the topic being created.";
+
+
     /** batch.size */
     public static final String BATCH_SIZE_CONFIG = "batch.size";
     private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent" + " to the same partition. This helps performance on both the client and the server. This configuration controls the "
@@ -200,6 +215,10 @@ public class ProducerConfig extends AbstractConfig {
                                         Importance.LOW,
                                         METADATA_FETCH_TIMEOUT_DOC)
                                 .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
+                                .define(AUTO_CREATE_TOPICS_ENABLE, Type.BOOLEAN, true, Importance.LOW, AUTO_CREATE_TOPICS_ENABLE_DOC)
+                                .define(TOPICS_NUM_PARTITIONS, Type.INT, 1, Importance.LOW, TOPICS_NUM_PARTITIONS_DOC)
+                                .define(TOPICS_REPLICATION_FACTOR, Type.INT, 1, Importance.LOW, TOPICS_REPLICATION_FACTOR_DOC)
+                                .define(TOPICS_CONFIG, Type.LIST, "", Importance.LOW, TOPICS_CONFIG_DOC)
                                 .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
                                         Type.LONG,
                                         30000,
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TopicExistsException.java b/clients/src/main/java/org/apache/kafka/common/errors/TopicExistsException.java
new file mode 100644
index 0000000..9137222
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/TopicExistsException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This topic/partition doesn't exist
+ */
+public class TopicExistsException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public TopicExistsException() {
+    }
+
+    public TopicExistsException(String message) {
+        super(message);
+    }
+
+    public TopicExistsException(Throwable throwable) {
+        super(throwable);
+    }
+
+    public TopicExistsException(String message, Throwable throwable) {
+        super(message, throwable);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnableToCreateTopicException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnableToCreateTopicException.java
new file mode 100644
index 0000000..f50c5dc
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnableToCreateTopicException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * unable to create a topic due to an exception
+ */
+public class UnableToCreateTopicException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public UnableToCreateTopicException() {
+    }
+
+    public UnableToCreateTopicException(String message) {
+        super(message);
+    }
+
+    public UnableToCreateTopicException(Throwable throwable) {
+        super(throwable);
+    }
+
+    public UnableToCreateTopicException(String message, Throwable throwable) {
+        super(message, throwable);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 109fc96..8b0eea8 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -34,7 +34,8 @@ public enum ApiKeys {
     OFFSET_FETCH(9, "offset_fetch"),
     CONSUMER_METADATA(10, "consumer_metadata"),
     JOIN_GROUP(11, "join_group"),
-    HEARTBEAT(12, "heartbeat");
+    HEARTBEAT(12, "heartbeat"),
+    CREATE_TOPIC(13, "create_topic");
 
     private static ApiKeys[] codeToType;
     public static int MAX_API_KEY = -1;
@@ -63,4 +64,4 @@ public enum ApiKeys {
     public static ApiKeys forId(int id) {
         return codeToType[id];
     }
-}
\ No newline at end of file
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 3374bd9..09470b7 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -30,12 +30,14 @@ import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnableToCreateTopicException;
+import org.apache.kafka.common.errors.TopicExistsException;
 
 
 /**
  * This class contains all the client-server errors--those errors that must be sent from the server to the client. These
  * are thus part of the protocol. The names can be changed but the error code cannot.
- * 
+ *
  * Do not add exceptions that occur only on the client or only on the server here.
  */
 public enum Errors {
@@ -51,7 +53,9 @@ public enum Errors {
     // TODO: errorCode 8, 9, 11
     MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
     OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
-    NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received."));
+    NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")),
+    UNABLE_TO_CREATE_TOPIC(14, new UnableToCreateTopicException("The create topic request failed.")),
+    TOPIC_EXISTS(15, new TopicExistsException("The create topic request failed."));
 
     private static Map, Errors> classToError = new HashMap, Errors>();
     private static Map codeToError = new HashMap();
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 7517b87..1088519 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -362,6 +362,30 @@ public class Protocol {
     public static Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
     public static Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
 
+    /* CreateTopic api */
+    public static Schema CREATE_TOPIC_REQUEST_V0 = new Schema(new Field("topics",
+                                                                    new ArrayOf(STRING),
+                                                                    "An array of topics to fetch metadata for. If no topics are specified "),
+                                                              new Field("num_partitions",
+                                                                        INT32,
+                                                                        "Number of partitions per topic"),
+                                                              new Field("replication_factor",
+                                                                        INT32,
+                                                                        "Replication factor for the topic"),
+                                                              new Field("topic_config",
+                                                                        new ArrayOf(STRING),
+                                                                        "A topic configuration override for the topic being created."));
+
+    public static Schema CREATE_TOPIC_RESULT = new Schema(new Field("topic", STRING, "topic created."),
+                                                          new Field("topic_error_code", INT16));
+
+    public static Schema CREATE_TOPIC_RESPONSE_V0 = new Schema(new Field("create_topic_results",
+                                                                    new ArrayOf(CREATE_TOPIC_RESULT),
+                                                                    "An array of topics to fetch metadata for. If no topics are specified "));
+
+    public static Schema[] CREATE_TOPIC_REQUEST = new Schema[] {CREATE_TOPIC_REQUEST_V0};
+    public static Schema[] CREATE_TOPIC_RESPONSE = new Schema[] {CREATE_TOPIC_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions */
     public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
     public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -381,6 +405,7 @@ public class Protocol {
         REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
         REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
         REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
+        REQUESTS[ApiKeys.CREATE_TOPIC.id] = CREATE_TOPIC_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -393,6 +418,7 @@ public class Protocol {
         RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;
         RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
         RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
+        RESPONSES[ApiKeys.CREATE_TOPIC.id] = CREATE_TOPIC_RESPONSE;
 
         /* set the maximum version of each api */
         for (ApiKeys api : ApiKeys.values())
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicRequest.java
new file mode 100644
index 0000000..4c87321
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicRequest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class CreateTopicRequest extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.CREATE_TOPIC.id);
+    private static String TOPICS_KEY_NAME = "topics";
+    private static String NUM_PARTITIONS_KEY_NAME = "num_partitions";
+    private static String REPLICATION_FACTOR_KEY_NAME = "replication_factor";
+    private static String TOPIC_OPTIONS_KEY_NAME = "topic_config";
+    private static final int DEFAULT_NUM_PARTITIONS = 1;
+    private static final int DEFAULT_REPLICATION_FACTOR = 1;
+    private static final List DEFAULT_TOPIC_CONFIG = new ArrayList();
+
+    private final List topics;
+    private final int numPartitions;
+    private final int replicationFactor;
+    private final List topicConfig;
+
+    public CreateTopicRequest(List topics) {
+        this(topics, DEFAULT_NUM_PARTITIONS, DEFAULT_NUM_PARTITIONS, DEFAULT_TOPIC_CONFIG);
+    }
+
+    public CreateTopicRequest(List topics, int numPartitions, int replicationFactor) {
+        this(topics, numPartitions, replicationFactor, DEFAULT_TOPIC_CONFIG);
+    }
+
+    public CreateTopicRequest(List topics, int numPartitions, int replicationFactor, List topicConfig) {
+        super(new Struct(curSchema));
+        struct.set(TOPICS_KEY_NAME, topics.toArray());
+        struct.set(NUM_PARTITIONS_KEY_NAME, numPartitions);
+        struct.set(REPLICATION_FACTOR_KEY_NAME, replicationFactor);
+        struct.set(TOPIC_OPTIONS_KEY_NAME,  topicConfig.toArray());
+        this.topics = topics;
+        this.numPartitions = numPartitions;
+        this.replicationFactor = replicationFactor;
+        this.topicConfig = topicConfig;
+    }
+
+    public CreateTopicRequest(Struct struct) {
+        super(struct);
+        Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
+        topics = new ArrayList();
+        for (Object topicObj: topicArray) {
+            topics.add((String) topicObj);
+        }
+        numPartitions = struct.hasField(NUM_PARTITIONS_KEY_NAME) ? struct.getInt(NUM_PARTITIONS_KEY_NAME) : DEFAULT_NUM_PARTITIONS;
+        replicationFactor = struct.hasField(REPLICATION_FACTOR_KEY_NAME) ? struct.getInt(REPLICATION_FACTOR_KEY_NAME) : DEFAULT_REPLICATION_FACTOR;
+        Object[] topicConfigArray = struct.getArray(TOPIC_OPTIONS_KEY_NAME);
+        topicConfig = new ArrayList();
+        for(Object topicConfigObj : topicConfigArray) {
+            topicConfig.add((String) topicConfigObj);
+        }
+    }
+
+    public List topics() {
+        return topics;
+    }
+
+    public int getNumParitions() {
+        return numPartitions;
+    }
+
+    public int getReplicationFactor() {
+        return replicationFactor;
+    }
+
+    public List getTopicOptions() {
+        return topicConfig;
+    }
+
+    public static CreateTopicRequest parse(ByteBuffer buffer) {
+        return new CreateTopicRequest(((Struct) curSchema.read(buffer)));
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicResponse.java
new file mode 100644
index 0000000..4f5ea9d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicResponse.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class CreateTopicResponse extends AbstractRequestResponse {
+    private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.CREATE_TOPIC.id);
+    private static String CREATE_TOPIC_RESULTS_KEY_NAME = "create_topic_results";
+    private static String TOPIC_KEY_NAME = "topic";
+    private static String TOPIC_ERROR_CODE = "topic_error_code";
+
+    private final Map topics;
+
+    public CreateTopicResponse(Map topics) {
+        super(new Struct(curSchema));
+        this.topics = topics;
+        List createTopicResultsArray = new ArrayList();
+        for (String topic : topics.keySet()) {
+            Struct createTopicResult = struct.instance(CREATE_TOPIC_RESULTS_KEY_NAME);
+            createTopicResult.set(TOPIC_KEY_NAME, topic);
+            createTopicResult.set(TOPIC_ERROR_CODE, topics.get(topic).code());
+            createTopicResultsArray.add(createTopicResult);
+        }
+        struct.set(CREATE_TOPIC_RESULTS_KEY_NAME, createTopicResultsArray.toArray());
+    }
+
+    public CreateTopicResponse(Struct struct) {
+        super(struct);
+        topics = new HashMap();
+        Object[] createTopicResultStructs = (Object[]) struct.get(CREATE_TOPIC_RESULTS_KEY_NAME);
+        for(int i = 0; i < createTopicResultStructs.length; i++) {
+            Struct createTopicResult = (Struct) createTopicResultStructs[i];
+            Short topicError = createTopicResult.getShort(TOPIC_ERROR_CODE);
+            String topic = createTopicResult.getString(TOPIC_KEY_NAME);
+            topics.put(topic, Errors.forCode(topicError));
+        }
+    }
+
+    public Map topics() {
+        return topics;
+    }
+
+    public static CreateTopicResponse parse(ByteBuffer buffer) {
+        return new CreateTopicResponse(((Struct) curSchema.read(buffer)));
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 1a55242..a12f581 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -37,7 +37,8 @@ public class NetworkClientTest {
     private int nodeId = 1;
     private Cluster cluster = TestUtils.singletonCluster("test", nodeId);
     private Node node = cluster.nodes().get(0);
-    private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024);
+    private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024,
+                                                     64 * 1024, true, 1, 1, new ArrayList());
 
     @Before
     public void setup() {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index df37fc6..d7eacb0 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -17,6 +17,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
 import org.junit.Test;
 
 import java.lang.reflect.Method;
@@ -47,6 +48,8 @@ public class RequestResponseTest {
                 createListOffsetResponse(),
                 createMetadataRequest(),
                 createMetadataResponse(),
+                createTopicRequest(),
+                createTopicResponse(),
                 createOffsetCommitRequest(),
                 createOffsetCommitResponse(),
                 createOffsetFetchRequest(),
@@ -137,6 +140,16 @@ public class RequestResponseTest {
         return new MetadataResponse(cluster);
     }
 
+    private AbstractRequestResponse createTopicRequest() {
+        return new CreateTopicRequest(Arrays.asList("topic1"),1,1);
+    }
+
+    private AbstractRequestResponse createTopicResponse() {
+        Map  createTopicResult = new HashMap();
+        createTopicResult.put("topic1", Errors.NONE);
+        return new CreateTopicResponse(createTopicResult);
+    }
+
     private AbstractRequestResponse createOffsetCommitRequest() {
         Map commitData = new HashMap();
         commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, 1000000, ""));
diff --git a/core/src/main/scala/kafka/api/CreateTopicRequest.scala b/core/src/main/scala/kafka/api/CreateTopicRequest.scala
new file mode 100644
index 0000000..b902cbc
--- /dev/null
+++ b/core/src/main/scala/kafka/api/CreateTopicRequest.scala
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import java.util.Properties
+import kafka.api.ApiUtils._
+import collection.mutable.ListBuffer
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.ErrorMapping
+import kafka.network.RequestChannel.Response
+import kafka.utils.Logging
+
+object CreateTopicRequest extends Logging {
+  val CurrentVersion = 0.shortValue
+  val DefaultClientId = ""
+
+  /**
+   * CreateTopicRequest has the following format -
+   * number of topics (4 bytes) list of topics (2 bytes + topic.length per topic) , numPartitions(4 bytes) , replicationFactor(4 bytes),
+   * number of topicConig (4 bytes) list of topicConfigs(2 bytes + topicConfig.length)
+   */
+
+  def readFrom(buffer: ByteBuffer): CreateTopicRequest = {
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = readShortString(buffer)
+    val numTopics = readIntInRange(buffer, "number of topics", (0, Int.MaxValue))
+    val topics = new ListBuffer[String]()
+    for(i <- 0 until numTopics)
+      topics += readShortString(buffer)
+    val numPartitions = readIntInRange(buffer, "number of paritions", (0, Int.MaxValue))
+    val replicationFactor = readIntInRange(buffer, "replication factor", (0, Int.MaxValue))
+    val topicConfig = new Properties()
+    val numTopicConfigs = readIntInRange(buffer, "number of topic options", (0, Int.MaxValue))
+    for (i <- 0 until numTopicConfigs) {
+      val topicConfigPair = readShortString(buffer).split("""\s*=\s*""")
+      topicConfig.setProperty(topicConfigPair(0).trim, topicConfigPair(1).trim)
+    }
+    new CreateTopicRequest(versionId, correlationId, clientId, topics.toList, numPartitions, replicationFactor, topicConfig)
+  }
+}
+
+case class CreateTopicRequest(val versionId: Short,
+                              val correlationId: Int,
+                              val clientId: String,
+                              val topics: Seq[String],
+                              val numPartitions: Int,
+                              val replicationFactor: Int,
+                              val topicConfig: Properties = new Properties)
+ extends RequestOrResponse(Some(RequestKeys.CreateTopicKey)){
+
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    writeShortString(buffer, clientId)
+    buffer.putInt(topics.size)
+    topics.foreach(topic => writeShortString(buffer, topic))
+    buffer.putInt(numPartitions)
+    buffer.putInt(replicationFactor)
+    buffer.putInt(topicConfig.size)
+    topicConfig.stringPropertyNames.toArray.foreach(
+      topicConfigKey => "%s=%s".format(topicConfigKey,topicConfig.getProperty(topicConfigKey.toString)))
+
+  }
+
+  def sizeInBytes(): Int = {
+    2 +  /* version id */
+    4 + /* correlation id */
+    shortStringLength(clientId)  + /* client id */
+    4 + /* number of topics */
+    topics.foldLeft(0)(_ + shortStringLength(_)) + /* topics */
+    4 + /* number of partitions */
+    4 + /* replicaiton factor per topic */
+    4 + /* number of topic configs */
+    topicConfig.stringPropertyNames.toArray.map(topicConfigKey => "%s=%s".format(topicConfigKey,topicConfig.getProperty(topicConfigKey.toString)))
+      .foldLeft(0)(_ + shortStringLength(_))
+  }
+
+  override def toString(): String = {
+    describe(true)
+  }
+
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    val createTopicResult = topics.map {
+      topic => CreateTopicResult(topic, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    }
+    val errorResponse = CreateTopicResponse(createTopicResult, correlationId)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
+
+  override def describe(details: Boolean): String = {
+    val createTopicRequest = new StringBuilder
+    createTopicRequest.append("Name: " + this.getClass.getSimpleName)
+    createTopicRequest.append("; Version: " + versionId)
+    createTopicRequest.append("; CorrelationId: " + correlationId)
+    createTopicRequest.append("; ClientId: " + clientId)
+    if(details) {
+      createTopicRequest.append("; Topics: " + topics.mkString(","))
+      createTopicRequest.append("; numPartitions: "+ numPartitions)
+      createTopicRequest.append("; replicationFactor: "+ replicationFactor)
+      createTopicRequest.append("; topicConfig: "+ topicConfig)
+    }
+    createTopicRequest.toString()
+  }
+}
diff --git a/core/src/main/scala/kafka/api/CreateTopicResponse.scala b/core/src/main/scala/kafka/api/CreateTopicResponse.scala
new file mode 100644
index 0000000..3692d4d
--- /dev/null
+++ b/core/src/main/scala/kafka/api/CreateTopicResponse.scala
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import kafka.api.ApiUtils._
+import java.nio.ByteBuffer
+import collection.mutable.ListBuffer
+
+object CreateTopicResponse {
+
+  def readFrom(buffer: ByteBuffer): CreateTopicResponse = {
+    val correlationId = buffer.getInt
+    val createTopicCount = buffer.getInt
+    val createTopicResults = (0 until createTopicCount).map(_ => CreateTopicResult.readFrom(buffer))
+    new CreateTopicResponse(createTopicResults, correlationId)
+  }
+}
+
+case class CreateTopicResponse(createTopicResults: Seq[CreateTopicResult],
+                                 correlationId: Int)
+    extends RequestOrResponse() {
+  val sizeInBytes: Int = {
+    4 + 4 + createTopicResults.map(_.sizeInBytes).sum
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(correlationId)
+    /* created topics */
+    buffer.putInt(createTopicResults.size)
+    createTopicResults.foreach(_.writeTo(buffer))
+  }
+
+  override def describe(details: Boolean):String = { toString }
+}
diff --git a/core/src/main/scala/kafka/api/CreateTopicResult.scala b/core/src/main/scala/kafka/api/CreateTopicResult.scala
new file mode 100644
index 0000000..ce2b702
--- /dev/null
+++ b/core/src/main/scala/kafka/api/CreateTopicResult.scala
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.api.ApiUtils._
+import kafka.utils.Logging
+import kafka.common._
+
+object CreateTopicResult {
+  def readFrom(buffer: ByteBuffer): CreateTopicResult = {
+    val topic = readShortString(buffer)
+    val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
+    new CreateTopicResult(topic, errorCode)
+  }
+}
+
+case class CreateTopicResult(topic: String, errorCode: Short = ErrorMapping.NoError) {
+  def sizeInBytes: Int = {
+    shortStringLength(topic) +
+    2 /* error code */
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    /* topic */
+    writeShortString(buffer, topic)
+    /* error code */
+    buffer.putShort(errorCode)
+
+  }
+
+  override def toString(): String = {
+    val createTopicResult = new StringBuilder
+    createTopicResult.append("Topic: "+ topic)
+    createTopicResult.append("; ErrorCode: "+ errorCode)
+    createTopicResult.toString()
+  }
+}
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index c24c034..195c052 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -34,6 +34,7 @@ object RequestKeys {
   val ConsumerMetadataKey: Short = 10
   val JoinGroupKey: Short = 11
   val HeartbeatKey: Short = 12
+  val CreateTopicKey: Short = 13
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -48,7 +49,8 @@ object RequestKeys {
         OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
         ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom),
         JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom),
-        HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom)
+        HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom),
+        CreateTopicKey -> ("CreateTopic", CreateTopicRequest.readFrom)
     )
 
   def nameForKey(key: Short): String = {
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index ce7ede3..f009598 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -22,13 +22,12 @@ import kafka.api._
 import kafka.producer._
 import kafka.common.{ErrorMapping, KafkaException}
 import kafka.utils.{Utils, Logging}
-import java.util.Properties
 import util.Random
- import kafka.network.BlockingChannel
- import kafka.utils.ZkUtils._
- import org.I0Itec.zkclient.ZkClient
- import java.io.IOException
-
+import kafka.network.BlockingChannel
+import kafka.utils.ZkUtils._
+import org.I0Itec.zkclient.ZkClient
+import java.io.IOException
+import java.util.Properties
  /**
  * Helper functions common to clients (producer, consumer, or admin)
  */
@@ -92,8 +91,43 @@ object ClientUtils extends Logging{
     fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
   }
 
+
+  def createTopic(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig,
+                  correlationId: Int): CreateTopicResponse = {
+    var createTopicSucceeded: Boolean = false
+    var i: Int = 0
+    val topicConfig = parseTopicConfig(producerConfig.topicConfig)
+    val createTopicRequest = new CreateTopicRequest(CreateTopicRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq, producerConfig.numPartitions, producerConfig.replicationFactor, topicConfig)
+    var createTopicResponse : CreateTopicResponse = null
+    var t: Throwable = null
+    val shuffledBrokers = Random.shuffle(brokers)
+    while(i < shuffledBrokers.size && !createTopicSucceeded) {
+      val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
+      debug("creating topic on  broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))
+      try {
+        createTopicResponse = producer.send(createTopicRequest)
+        createTopicSucceeded = true
+      }
+      catch {
+        case e: Throwable =>
+          warn("creating topic with correlation id %d for topics [%s] from broker [%s] failed"
+            .format(correlationId, topics, shuffledBrokers(i).toString))
+          t = e
+      } finally {
+        i = i + 1
+        producer.close()
+      }
+    }
+    if(!createTopicSucceeded) {
+      throw new KafkaException("creating topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t)
+    } else {
+      debug("Successfully created %d topic(s) %s".format(topics.size, topics))
+    }
+    createTopicResponse
+  }
+
   /**
-   * Parse a list of broker urls in the form host1:port1, host2:port2, ... 
+   * Parse a list of broker urls in the form host1:port1, host2:port2, ...
    */
   def parseBrokerList(brokerListStr: String): Seq[Broker] = {
     val brokersStr = Utils.parseCsvList(brokerListStr)
@@ -108,6 +142,17 @@ object ClientUtils extends Logging{
     })
   }
 
+  /**
+    * parse a list of strings in the form of key=value into Properties
+    */
+  def parseTopicConfig(topicConfigList: Seq[String]) : Properties = {
+    val topicConfigProps = new Properties()
+    topicConfigList.foreach { topicConfigStr =>
+      val topicConfigPair = topicConfigStr.split("""\s*=\s*""")
+      topicConfigProps.setProperty(topicConfigPair(0), topicConfigPair(1))
+    }
+    topicConfigProps
+  }
    /**
     * Creates a blocking channel to a random broker
     */
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 5559d26..0ceeb6b 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -23,7 +23,7 @@ import java.lang.Throwable
 import scala.Predef._
 
 /**
- * A bi-directional mapping between error codes and exceptions x  
+ * A bi-directional mapping between error codes and exceptions x
  */
 object ErrorMapping {
   val EmptyByteBuffer = ByteBuffer.allocate(0)
@@ -46,8 +46,10 @@ object ErrorMapping {
   val OffsetsLoadInProgressCode: Short = 14
   val ConsumerCoordinatorNotAvailableCode: Short = 15
   val NotCoordinatorForConsumerCode: Short = 16
+  val UnableToCreateTopicCode: Short = 17
+  val TopicExistsCode: Short = 18
 
-  private val exceptionToCode = 
+  private val exceptionToCode =
     Map[Class[Throwable], Short](
       classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode,
       classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
@@ -63,15 +65,17 @@ object ErrorMapping {
       classOf[OffsetMetadataTooLargeException].asInstanceOf[Class[Throwable]] -> OffsetMetadataTooLargeCode,
       classOf[OffsetsLoadInProgressException].asInstanceOf[Class[Throwable]] -> OffsetsLoadInProgressCode,
       classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] -> ConsumerCoordinatorNotAvailableCode,
-      classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode
+      classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode,
+      classOf[UnableToCreateTopicException].asInstanceOf[Class[Throwable]] -> UnableToCreateTopicCode,
+      classOf[TopicExistsException].asInstanceOf[Class[Throwable]] -> TopicExistsCode
     ).withDefaultValue(UnknownCode)
-  
+
   /* invert the mapping */
-  private val codeToException = 
+  private val codeToException =
     (Map[Short, Class[Throwable]]() ++ exceptionToCode.iterator.map(p => (p._2, p._1))).withDefaultValue(classOf[UnknownException])
-  
+
   def codeFor(exception: Class[Throwable]): Short = exceptionToCode(exception)
-  
+
   def maybeThrowException(code: Short) =
     if(code != 0)
       throw codeToException(code).newInstance()
diff --git a/core/src/main/scala/kafka/common/UnableToCreateTopicException.scala b/core/src/main/scala/kafka/common/UnableToCreateTopicException.scala
new file mode 100644
index 0000000..8702888
--- /dev/null
+++ b/core/src/main/scala/kafka/common/UnableToCreateTopicException.scala
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class UnableToCreateTopicException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+  def this(message: String) = this(message, null)
+  def this() = this(null, null)
+}
diff --git a/core/src/main/scala/kafka/javaapi/CreateTopicRequest.scala b/core/src/main/scala/kafka/javaapi/CreateTopicRequest.scala
new file mode 100644
index 0000000..ba21890
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/CreateTopicRequest.scala
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.javaapi
+
+import java.nio.ByteBuffer
+import java.util.Properties
+import kafka.api._
+import scala.collection.mutable
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.ErrorMapping
+import kafka.network.RequestChannel.Response
+
+class CreateTopicRequest(val versionId: Short,
+                           val correlationId: Int,
+                           val clientId: String,
+                           val topics: java.util.List[String],
+                           val numPartitions: Int,
+                           val replicationFactor: Int,
+                           val topicConfig: Properties = new Properties)
+    extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) {
+
+  val underlying: kafka.api.CreateTopicRequest = {
+    import scala.collection.JavaConversions._
+    new kafka.api.CreateTopicRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String], numPartitions, replicationFactor, topicConfig)
+  }
+
+  def this(topics: java.util.List[String], numPartitions: Int, replicationFactor: Int) =
+    this(kafka.api.CreateTopicRequest.CurrentVersion, 0, kafka.api.CreateTopicRequest.DefaultClientId, topics, numPartitions, replicationFactor)
+
+  def this(topics: java.util.List[String], numPartitions: Int, replicationFactor: Int, topicConfig: Properties) =
+    this(kafka.api.CreateTopicRequest.CurrentVersion, 0, kafka.api.CreateTopicRequest.DefaultClientId, topics, numPartitions, replicationFactor, topicConfig)
+
+  def this(topics: java.util.List[String], correlationId: Int, numPartitions: Int, replicationFactor: Int) =
+    this(kafka.api.CreateTopicRequest.CurrentVersion, correlationId, kafka.api.CreateTopicRequest.DefaultClientId, topics, numPartitions, replicationFactor)
+
+  def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
+
+  def sizeInBytes: Int = underlying.sizeInBytes()
+
+  override def toString(): String = {
+    describe(true)
+  }
+
+  override def describe(details: Boolean): String = {
+    val createTopicRequest = new StringBuilder
+    createTopicRequest.append("Name: " + this.getClass.getSimpleName)
+    createTopicRequest.append("; Version: " + versionId)
+    createTopicRequest.append("; CorrelationId: " + correlationId)
+    createTopicRequest.append("; ClientId: " + clientId)
+    if(details) {
+      createTopicRequest.append("; Topics: ")
+      val topicIterator = topics.iterator()
+      while (topicIterator.hasNext) {
+        val topic = topicIterator.next()
+        createTopicRequest.append("%s".format(topic))
+        if(topicIterator.hasNext)
+          createTopicRequest.append(",")
+      }
+      createTopicRequest.append("; numPartitions: "+ numPartitions)
+      createTopicRequest.append("; repliactionFactor: "+ replicationFactor)
+      createTopicRequest.append("; topicConfig: "+topicConfig)
+    }
+    createTopicRequest.toString()
+  }
+}
diff --git a/core/src/main/scala/kafka/javaapi/CreateTopicResponse.scala b/core/src/main/scala/kafka/javaapi/CreateTopicResponse.scala
new file mode 100644
index 0000000..331ddf0
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/CreateTopicResponse.scala
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.javaapi
+
+class CreateTopicResponse(private val underlying: kafka.api.CreateTopicResponse) {
+  def sizeInBytes: Int = underlying.sizeInBytes
+
+  def createTopicResults: java.util.List[CreateTopicResult] = {
+    import kafka.javaapi.CreateTopicResultListImplicits._
+    underlying.createTopicResults
+  }
+
+  override def equals(other: Any) = canEqual(other) && {
+    val otherCreateTopicResponse = other.asInstanceOf[kafka.javaapi.CreateTopicResponse]
+    this.underlying.equals(otherCreateTopicResponse.underlying)
+  }
+
+  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.CreateTopicResponse]
+
+  override def hashCode = underlying.hashCode
+
+  override def toString = underlying.toString
+}
diff --git a/core/src/main/scala/kafka/javaapi/CreateTopicResult.scala b/core/src/main/scala/kafka/javaapi/CreateTopicResult.scala
new file mode 100644
index 0000000..adc7f32
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/CreateTopicResult.scala
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import scala.collection.JavaConversions
+
+private[javaapi] object CreateTopicResultListImplicits {
+  implicit def toJavaCreateTopicResultList(createTopicResultSeq: Seq[kafka.api.CreateTopicResult]):
+  java.util.List[kafka.javaapi.CreateTopicResult] = {
+    import JavaConversions._
+    createTopicResultSeq.map(new kafka.javaapi.CreateTopicResult(_))
+  }
+}
+
+class CreateTopicResult(private val underlying: kafka.api.CreateTopicResult) {
+  def topic: String = underlying.topic
+  def errorCode: Short = underlying.errorCode
+}
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 13a8aa6..45ce256 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -86,18 +86,25 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
       trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
       if(tmd.errorCode == ErrorMapping.NoError) {
         topicPartitionInfo.put(tmd.topic, tmd)
-      } else
+      } else if(producerConfig.autoCreateTopics && tmd.errorCode == ErrorMapping.UnknownTopicOrPartitionCode) {
+        trace("Creating topic %s ".format(tmd.topic))
+        val createTopicResponse = ClientUtils.createTopic(topics, brokers, producerConfig, correlationId)
+        createTopicResponse.createTopicResults.foreach(ctr => {
+          if (ctr.errorCode != ErrorMapping.NoError)
+            warn("Error while creating topic [%s] , error [%s]".format(ctr.topic, ctr.errorCode))
+        })
+      } else {
         warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
-      tmd.partitionsMetadata.foreach(pmd =>{
-        if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
-          warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
-            ErrorMapping.exceptionFor(pmd.errorCode).getClass))
-        } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
-      })
-    })
+        tmd.partitionsMetadata.foreach(pmd =>{
+          if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
+            warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
+              ErrorMapping.exceptionFor(pmd.errorCode).getClass))
+          } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
+        })
+      }})
     producerPool.updateProducer(topicsMetadata)
   }
-  
+
 }
 
 case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int])
diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala
index 3cdf23d..db1a2ff 100644
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala
@@ -113,5 +113,29 @@ class ProducerConfig private (val props: VerifiableProperties)
    */
   val topicMetadataRefreshIntervalMs = props.getInt("topic.metadata.refresh.interval.ms", 600000)
 
+  /**
+    * Enable auto creation of topic on the producer.
+    * If this is set to true then attempts to produce for a non-existent topic will automatically creates it
+    * with the default replication factor and number of partitions.
+    */
+  val autoCreateTopics = props.getBoolean("auto.create.topics.enable", true)
+
+  /**
+   * The number of partitions per topic. If auto.create.topics.enable set to true producer uses
+   * this create.topics.num.partitions as per topic partitions number.
+   */
+  val numPartitions = props.getInt("topics.num.partitions", 1)
+
+  /**
+    * The replication factor per topic. If auto.create.topics.enable set to true producer uses
+    * create.topics.replication.factor as per topic replication factor.
+    */
+  val replicationFactor = props.getInt("topics.replication.factor", 1)
+
+  /**
+    * A topic configuration override for the topic being created.
+    */
+  val topicConfig = Utils.parseCsvList(props.getString("topics.config", null))
+
   validate(this)
 }
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 489f007..fb871cc 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -113,6 +113,11 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
     TopicMetadataResponse.readFrom(response.buffer)
   }
 
+  def send(request: CreateTopicRequest): CreateTopicResponse = {
+    val response = doSend(request)
+    CreateTopicResponse.readFrom(response.buffer)
+  }
+
   def close() = {
     lock synchronized {
       disconnect()
@@ -132,7 +137,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
       case e: Exception => error("Error on disconnect: ", e)
     }
   }
-    
+
   private def connect(): BlockingChannel = {
     if (!blockingChannel.isConnected && !shutdown) {
       try {
@@ -155,4 +160,3 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
     }
   }
 }
-
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index bb94673..590b37d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -27,6 +27,7 @@ import kafka.network.RequestChannel.Response
 import kafka.controller.KafkaController
 import kafka.utils.{SystemTime, Logging}
 
+import java.util.Properties
 import scala.collection._
 
 import org.I0Itec.zkclient.ZkClient
@@ -67,6 +68,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.OffsetCommitKey => handleProducerOrOffsetCommitRequest(request)
         case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
         case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
+        case RequestKeys.CreateTopicKey => handleCreateTopicRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -224,9 +226,9 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) {
-    def this(key: TopicAndPartition, throwable: Throwable) = 
+    def this(key: TopicAndPartition, throwable: Throwable) =
       this(key, -1L, -1L, Some(throwable))
-    
+
     def errorCode = error match {
       case None => ErrorMapping.NoError
       case Some(error) => ErrorMapping.codeFor(error.getClass.asInstanceOf[Class[Throwable]])
@@ -343,7 +345,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   /**
-   * Service the offset request API 
+   * Service the offset request API
    */
   def handleOffsetRequest(request: RequestChannel.Request) {
     val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest]
@@ -366,7 +368,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             val hw = localReplica.highWatermark.messageOffset
             if (allOffsets.exists(_ > hw))
               hw +: allOffsets.dropWhile(_ > hw)
-            else 
+            else
               allOffsets
           }
         }
@@ -390,19 +392,19 @@ class KafkaApis(val requestChannel: RequestChannel,
     val response = OffsetResponse(offsetRequest.correlationId, responseMap)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
-  
+
   def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
     logManager.getLog(topicAndPartition) match {
-      case Some(log) => 
+      case Some(log) =>
         fetchOffsetsBefore(log, timestamp, maxNumOffsets)
-      case None => 
+      case None =>
         if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime)
           Seq(0L)
         else
           Nil
     }
   }
-  
+
   def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
     val segsArray = log.logSegments.toArray
     var offsetTimeArray: Array[(Long, Long)] = null
@@ -449,22 +451,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (topics.size > 0 && topicResponses.size != topics.size) {
       val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
       val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
-        if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) {
-          try {
-            if (topic == OffsetManager.OffsetsTopicName) {
-              AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor,
+        if (topic == OffsetManager.OffsetsTopicName) {
+          AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor,
                                      offsetManager.offsetsTopicConfig)
-              info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-                .format(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor))
-            }
-            else {
-              AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
-              info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-                   .format(topic, config.numPartitions, config.defaultReplicationFactor))
-            }
-          } catch {
-            case e: TopicExistsException => // let it go, possibly another broker created this topic
-          }
+          info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+            .format(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor))
+          new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
+        } else if (AdminUtils.topicExists(zkClient, topic)) { //if the topic is in zk but doesn't exists in metadataCache
           new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
         } else {
           new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
@@ -511,8 +504,12 @@ class KafkaApis(val requestChannel: RequestChannel,
     val partition = offsetManager.partitionFor(consumerMetadataRequest.group)
 
     // get metadata (and create the topic if necessary)
-    val offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName)).head
-
+    var offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName)).head
+    if(offsetsTopicMetadata.errorCode == ErrorMapping.UnknownTopicOrPartitionCode) {
+      createTopic(Set(OffsetManager.OffsetsTopicName), config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor,
+                                     offsetManager.offsetsTopicConfig)
+      offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName)).head
+    }
     val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId)
 
     val response =
@@ -527,6 +524,36 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
+  /**
+   * Service the create topic request API
+   */
+  def handleCreateTopicRequest(request: RequestChannel.Request) {
+    val createTopicRequest = request.requestObj.asInstanceOf[CreateTopicRequest]
+    val createdTopicsList  = createTopic(createTopicRequest.topics.toSet, createTopicRequest.numPartitions, createTopicRequest.replicationFactor, createTopicRequest.topicConfig)
+    val brokers = metadataCache.getAliveBrokers
+    trace("Sending createTopicResponse %s and brokers %s for correlation id %d to client %s".format(createdTopicsList.mkString(","), brokers.mkString(","), createTopicRequest.correlationId, createTopicRequest.clientId))
+    val response = new CreateTopicResponse(createdTopicsList, createTopicRequest.correlationId)
+    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+  }
+
+  private def createTopic(topics: Set[String], numPartitions: Int, replicationFactor: Int, topicConfig: Properties): Seq[CreateTopicResult] = {
+    val createTopicResults: mutable.ListBuffer[CreateTopicResult] = new mutable.ListBuffer[CreateTopicResult]
+    for(topic <- topics) {
+      try {
+        AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, topicConfig)
+        trace("Auto creation of topic %s with %d partitions, replication factor %d and topicConfig %s is successful!"
+          .format(topic, numPartitions, replicationFactor, topicConfig))
+        createTopicResults += new CreateTopicResult(topic, ErrorMapping.NoError)
+      } catch {
+        case e: TopicExistsException =>
+          createTopicResults += new CreateTopicResult(topic, ErrorMapping.TopicExistsCode)
+        case e2: Exception =>
+          createTopicResults += new CreateTopicResult(topic, ErrorMapping.UnableToCreateTopicCode)
+      }
+    }
+    createTopicResults
+  }
+
   def close() {
     debug("Shutting down.")
     fetchRequestPurgatory.shutdown()
@@ -534,4 +561,3 @@ class KafkaApis(val requestChannel: RequestChannel,
     debug("Shut down complete.")
   }
 }
-
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 789e74c..b9c42ea 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -116,6 +116,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
   @Test
   def testNonExistentTopic() {
     // send a record with non-exist topic
+    producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize, autoCreateTopics = false);
     val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes)
     intercept[ExecutionException] {
       producer1.send(record).get
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index cd16ced..8b4021f 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -151,6 +151,15 @@ object SerializationTestUtils {
     new TopicMetadataResponse(brokers, Seq(topicmetaData1, topicmetaData2), 1)
   }
 
+  def createTestCreateTopicRequest: CreateTopicRequest = {
+    new CreateTopicRequest(1, 1, "client 1", Seq(topic1, topic2), 1, 1)
+  }
+
+  def createTestCreateTopicResponse: CreateTopicResponse = {
+    val createTopicResult = new CreateTopicResult(topic1, ErrorMapping.NoError)
+    new CreateTopicResponse(Seq(createTopicResult), 1)
+  }
+
   def createTestOffsetCommitRequestV1: OffsetCommitRequest = {
     new OffsetCommitRequest("group 1", collection.immutable.Map(
       TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata", timestamp=SystemTime.milliseconds),
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index e1d8711..26015cd 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -353,6 +353,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val header = "test-%d-%d".format(config.brokerId, partition)
     val props = new Properties()
     props.put("compression.codec", compression.codec.toString)
+    props.put("topics.num.partitions", numParts.toString)
     val producer: Producer[Int, String] =
       createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
                      encoder = classOf[StringEncoder].getName,
@@ -375,6 +376,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     var messages: List[String] = Nil
     val props = new Properties()
     props.put("compression.codec", compression.codec.toString)
+    props.put("topics.num.partitions", numParts.toString)
     val producer: Producer[Int, String] =
       createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs),
                      encoder = classOf[StringEncoder].getName,
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 35dc071..617859b 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -102,30 +102,4 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic2.head.partitionId)
     assertEquals(1, partitionMetadataTopic2.head.replicas.size)
   }
-
-  def testAutoCreateTopic {
-    // auto create topic
-    val topic = "testAutoCreateTopic"
-    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testAutoCreateTopic",
-      2000,0).topicsMetadata
-    assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
-    assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
-    assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
-
-    // wait for leader to be elected
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0)
-
-    // retry the metadata for the auto created topic
-    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata",
-      2000,0).topicsMetadata
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
-    var partitionMetadata = topicsMetadata.head.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(1, partitionMetadata.head.replicas.size)
-    assertTrue(partitionMetadata.head.leader.isDefined)
-  }
 }
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index dd71d81..c46aceb 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -338,4 +338,32 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
       producer.close()
     }
   }
+
+  @Test
+  def testCreateTopic() {
+    val topic = "new-topic1"
+    val props = new  Properties()
+    props.put("topic.num.partitions", "2")
+    props.put("topic.replication.factor", "2")
+    val producer1 = TestUtils.createProducer[String, String](
+      brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
+      encoder = classOf[StringEncoder].getName,
+      keyEncoder = classOf[StringEncoder].getName,
+      producerProps = props)
+    try {
+      producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
+      TestUtils.waitUntilTrue(() =>
+        AdminUtils.fetchTopicMetadataFromZk("new-topic1", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode,
+        "Topic new-topic1 not created after timeout",
+        waitTime = zookeeper.tickTime)
+      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+      val messageSet1 = response1.messageSet(topic, 0).iterator
+      assertTrue("Message set should have 1 message", messageSet1.hasNext)
+      assertEquals(new Message(bytes = "test1".getBytes, key = "test".getBytes), messageSet1.next.message)
+      assertFalse("Message set should have another message", messageSet1.hasNext)
+    } catch {
+      case e: Exception => fail("Not expected", e)
+    }
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c4e13c5..715e968 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -91,7 +91,7 @@ object TestUtils extends Logging {
         Utils.rm(f)
       }
     })
-    
+
     f
   }
 
@@ -376,7 +376,8 @@ object TestUtils extends Logging {
                         metadataFetchTimeout: Long = 3000L,
                         blockOnBufferFull: Boolean = true,
                         bufferSize: Long = 1024L * 1024L,
-                        retries: Int = 0) : KafkaProducer = {
+                        retries: Int = 0,
+                        autoCreateTopics: Boolean = true) : KafkaProducer = {
     import org.apache.kafka.clients.producer.ProducerConfig
 
     val producerProps = new Properties()
@@ -388,6 +389,7 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
     producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
     producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
+    producerProps.put(ProducerConfig.AUTO_CREATE_TOPICS_ENABLE, autoCreateTopics.toString)
     return new KafkaProducer(producerProps)
   }
 
-- 
1.8.5.2 (Apple Git-48)


From e3e816935f37443e437d0d3257c96767691d478d Mon Sep 17 00:00:00 2001
From: Sriharsha Chintalapani 
Date: Tue, 12 Aug 2014 18:08:11 -0700
Subject: [PATCH 2/2] KAFKA-1507. Using GetOffsetShell against non-existent
 topic creates the     topic unintentionally. Added CreateTopicRequest to the
 protocol ,     removed create topic from TopicMetadataRequest.

---
 core/src/main/scala/kafka/server/KafkaApis.scala          | 8 +-------
 core/src/main/scala/kafka/tools/GetOffsetShell.scala      | 5 +++--
 core/src/main/scala/kafka/tools/SimpleConsumerShell.scala | 7 ++++---
 3 files changed, 8 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 590b37d..4cc3fe7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -451,13 +451,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (topics.size > 0 && topicResponses.size != topics.size) {
       val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
       val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
-        if (topic == OffsetManager.OffsetsTopicName) {
-          AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor,
-                                     offsetManager.offsetsTopicConfig)
-          info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-            .format(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor))
-          new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
-        } else if (AdminUtils.topicExists(zkClient, topic)) { //if the topic is in zk but doesn't exists in metadataCache
+        if (AdminUtils.topicExists(zkClient, topic)) { //if the topic is in zk but doesn't exists in metadataCache
           new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
         } else {
           new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index 9c6064e..1b0c440 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -20,6 +20,7 @@ package kafka.tools
 
 import kafka.consumer._
 import joptsimple._
+import kafka.common.ErrorMapping
 import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
 import kafka.common.TopicAndPartition
 import kafka.client.ClientUtils
@@ -57,7 +58,7 @@ object GetOffsetShell {
                            .describedAs("ms")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(1000)
-                           
+
    if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.")
 
@@ -74,7 +75,7 @@ object GetOffsetShell {
     val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
 
     val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
-    if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
+    if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic) || topicsMetadata(0).errorCode == ErrorMapping.UnknownTopicOrPartitionCode) {
       System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) +
         "kafka-list-topic.sh to verify")
       System.exit(1)
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 36314f4..7078536 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -20,6 +20,7 @@ package kafka.tools
 import joptsimple._
 import kafka.utils._
 import kafka.consumer._
+import kafka.common.ErrorMapping
 import kafka.client.ClientUtils
 import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
 import kafka.cluster.Broker
@@ -93,7 +94,7 @@ object SimpleConsumerShell extends Logging {
         "skip it instead of halt.")
     val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend",
         "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages")
-        
+
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.")
 
@@ -126,7 +127,7 @@ object SimpleConsumerShell extends Logging {
     info("Getting topic metatdata...")
     val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
     val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
-    if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
+    if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic) || topicsMetadata(0).errorCode == ErrorMapping.UnknownTopicOrPartitionCode) {
       System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
       System.exit(1)
     }
-- 
1.8.5.2 (Apple Git-48)