From 6b6ee9749b5e084a2a50574ed2ea9af6702ab0e4 Mon Sep 17 00:00:00 2001
From: Andrii Biletskyi <andrbele@gmail.com>
Date: Mon, 8 Dec 2014 13:34:26 +0200
Subject: [PATCH] KAFKA-1694 - ClusterDiscoveryRequest, AdminRequest message;
 TopicCommand; basic shell functionality

---
 bin/kafka.sh                                       |  17 ++
 bin/windows/kafka.bat                              |  17 ++
 build.gradle                                       |  54 +++++-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   4 +-
 .../org/apache/kafka/common/protocol/Protocol.java |  40 ++++
 .../apache/kafka/common/requests/AdminRequest.java |  66 +++++++
 .../kafka/common/requests/AdminResponse.java       |  55 ++++++
 .../common/requests/ClusterMetadataRequest.java    |  36 ++++
 .../common/requests/ClusterMetadataResponse.java   | 103 ++++++++++
 .../kafka/common/requests/RequestResponseTest.java |  24 ++-
 core/src/main/scala/kafka/api/AdminRequest.scala   | 104 ++++++++++
 core/src/main/scala/kafka/api/AdminResponse.scala  |  49 +++++
 .../scala/kafka/api/ClusterMetadataRequest.scala   |  68 +++++++
 .../scala/kafka/api/ClusterMetadataResponse.scala  |  60 ++++++
 core/src/main/scala/kafka/api/RequestKeys.scala    |   6 +-
 .../api/admin/request/args/ParseException.scala    |  24 +++
 .../admin/request/args/TopicCommandArguments.scala | 126 ++++++++++++
 .../kafka/common/AdminRequestFailedException.scala |  26 +++
 .../src/main/scala/kafka/common/ErrorMapping.scala |   6 +-
 .../common/InvalidRequestTargetException.scala     |  24 +++
 .../controller/ControllerChannelManager.scala      |  13 +-
 .../scala/kafka/controller/KafkaController.scala   |   8 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  99 ++++++++++
 .../main/scala/kafka/server/MetadataCache.scala    |   8 +
 .../scala/kafka/server/TopicCommandHelper.scala    | 212 +++++++++++++++++++++
 .../scala/unit/kafka/api/AdminRequestTest.scala    | 116 +++++++++++
 .../api/RequestResponseSerializationTest.scala     |  23 ++-
 settings.gradle                                    |   2 +-
 .../java/org/apache/kafka/cli/BaseCommandOpts.java |  89 +++++++++
 tools/src/main/java/org/apache/kafka/cli/Boot.java |  72 +++++++
 .../org/apache/kafka/cli/RequestDispatcher.java    | 110 +++++++++++
 .../src/main/java/org/apache/kafka/cli/Shell.java  | 209 ++++++++++++++++++++
 .../kafka/cli/command/AlterTopicCommand.java       | 134 +++++++++++++
 .../kafka/cli/command/ClearScreenCommand.java      |  38 ++++
 .../java/org/apache/kafka/cli/command/Command.java |  30 +++
 .../kafka/cli/command/CreateTopicCommand.java      | 123 ++++++++++++
 .../kafka/cli/command/DeleteTopicCommand.java      |  90 +++++++++
 .../kafka/cli/command/DescribeTopicCommand.java    |  83 ++++++++
 .../org/apache/kafka/cli/command/ExitCommand.java  |  38 ++++
 .../kafka/cli/command/ListTopicsCommand.java       |  79 ++++++++
 .../apache/kafka/cli/command/PrintHelpCommand.java |  38 ++++
 .../kafka/cli/command/TopicSwitchCommand.java      |  59 ++++++
 .../org/apache/kafka/cli/util/StringUtils.java     |  50 +++++
 43 files changed, 2614 insertions(+), 18 deletions(-)
 create mode 100755 bin/kafka.sh
 create mode 100755 bin/windows/kafka.bat
 create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/AdminRequest.java
 create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/AdminResponse.java
 create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataRequest.java
 create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataResponse.java
 create mode 100644 core/src/main/scala/kafka/api/AdminRequest.scala
 create mode 100644 core/src/main/scala/kafka/api/AdminResponse.scala
 create mode 100644 core/src/main/scala/kafka/api/ClusterMetadataRequest.scala
 create mode 100644 core/src/main/scala/kafka/api/ClusterMetadataResponse.scala
 create mode 100644 core/src/main/scala/kafka/api/admin/request/args/ParseException.scala
 create mode 100644 core/src/main/scala/kafka/api/admin/request/args/TopicCommandArguments.scala
 create mode 100644 core/src/main/scala/kafka/common/AdminRequestFailedException.scala
 create mode 100644 core/src/main/scala/kafka/common/InvalidRequestTargetException.scala
 create mode 100644 core/src/main/scala/kafka/server/TopicCommandHelper.scala
 create mode 100644 core/src/test/scala/unit/kafka/api/AdminRequestTest.scala
 create mode 100644 tools/src/main/java/org/apache/kafka/cli/BaseCommandOpts.java
 create mode 100644 tools/src/main/java/org/apache/kafka/cli/Boot.java
 create mode 100644 tools/src/main/java/org/apache/kafka/cli/RequestDispatcher.java
 create mode 100644 tools/src/main/java/org/apache/kafka/cli/Shell.java
 create mode 100644 tools/src/main/java/org/apache/kafka/cli/command/AlterTopicCommand.java
 create mode 100644 tools/src/main/java/org/apache/kafka/cli/command/ClearScreenCommand.java
 create mode 100644 tools/src/main/java/org/apache/kafka/cli/command/Command.java
 create mode 100644 tools/src/main/java/org/apache/kafka/cli/command/CreateTopicCommand.java
 create mode 100644 tools/src/main/java/org/apache/kafka/cli/command/DeleteTopicCommand.java
 create mode 100644 tools/src/main/java/org/apache/kafka/cli/command/DescribeTopicCommand.java
 create mode 100644 tools/src/main/java/org/apache/kafka/cli/command/ExitCommand.java
 create mode 100644 tools/src/main/java/org/apache/kafka/cli/command/ListTopicsCommand.java
 create mode 100644 tools/src/main/java/org/apache/kafka/cli/command/PrintHelpCommand.java
 create mode 100644 tools/src/main/java/org/apache/kafka/cli/command/TopicSwitchCommand.java
 create mode 100644 tools/src/main/java/org/apache/kafka/cli/util/StringUtils.java

diff --git a/bin/kafka.sh b/bin/kafka.sh
new file mode 100755
index 0000000..0211f01
--- /dev/null
+++ b/bin/kafka.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.cli.Boot $@
diff --git a/bin/windows/kafka.bat b/bin/windows/kafka.bat
new file mode 100755
index 0000000..7380549
--- /dev/null
+++ b/bin/windows/kafka.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+%~dp0kafka-run-class.bat org.apache.kafka.cli.Boot %*
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index 18f86e4..0c54820 100644
--- a/build.gradle
+++ b/build.gradle
@@ -177,19 +177,19 @@ for ( sv in ['2_9_1', '2_9_2', '2_10_4', '2_11'] ) {
   }
 }
 
-tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_4', 'jar_core_2_11', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar']) {
+tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_4', 'jar_core_2_11', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'tools:jar']) {
 }
 
-tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_4', 'srcJar_2_11', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar']) { }
+tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_4', 'srcJar_2_11', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'tools:srcJar']) { }
 
-tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_4', 'docsJar_2_11', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar']) { }
+tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_4', 'docsJar_2_11', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'tools:docsJar']) { }
 
-tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_4', 'test_core_2_11', 'clients:test']) {
+tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_4', 'test_core_2_11', 'clients:test', 'tools:test']) {
 }
 
 tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_9_1', 'releaseTarGz_2_9_2', 'releaseTarGz_2_10_4', 'releaseTarGz_2_11']) {
 }
-
+// TODO add uploadArchives
 tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_4', 'uploadCoreArchives_2_11', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives']) {
 }
 
@@ -203,11 +203,13 @@ project(':core') {
 
   dependencies {
     compile project(':clients')
+    compile project(':tools')
     compile "org.scala-lang:scala-library:$scalaVersion"
     compile 'org.apache.zookeeper:zookeeper:3.4.6'
     compile 'com.101tec:zkclient:0.3'
     compile 'com.yammer.metrics:metrics-core:2.2.0'
-    compile 'net.sf.jopt-simple:jopt-simple:3.2'
+    // TODO temporally upgraded to get Parser.allowUnrecognizedOptions()
+    compile 'net.sf.jopt-simple:jopt-simple:4.8'
 
     testCompile 'junit:junit:4.1'
     testCompile 'org.easymock:easymock:3.0'
@@ -230,7 +232,6 @@ project(':core') {
   configurations {
     // manually excludes some unnecessary dependencies
     compile.exclude module: 'javax'
-    compile.exclude module: 'jline'
     compile.exclude module: 'jms'
     compile.exclude module: 'jmxri'
     compile.exclude module: 'jmxtools'
@@ -245,6 +246,9 @@ project(':core') {
     from (configurations.runtime) {
       exclude('kafka-clients*')
     }
+    from (configurations.runtime) {
+          include('kafka-tools*')
+    }
     into "$buildDir/dependant-libs-${scalaVersion}"
   }
 
@@ -374,3 +378,39 @@ project(':clients') {
   }
 
 }
+
+project(':tools') {
+    archivesBaseName = "kafka-tools"
+
+    dependencies {
+        compile "org.slf4j:slf4j-api:1.7.6"
+        //compile 'org.xerial.snappy:snappy-java:1.1.1.6'
+        //compile 'net.jpountz.lz4:lz4:1.2.0'
+
+        // TODO temporally upgraded to get Parser.allowUnrecognizedOptions()
+        compile 'net.sf.jopt-simple:jopt-simple:4.8'
+        compile 'jline:jline:2.12'
+        compile project(':clients')
+
+        testCompile 'com.novocode:junit-interface:0.9'
+        testRuntime "$slf4jlog4j"
+    }
+
+    task testJar(type: Jar) {
+        classifier = 'test'
+        from sourceSets.test.output
+    }
+
+    test {
+        testLogging {
+            events "passed", "skipped", "failed"
+            exceptionFormat = 'full'
+        }
+    }
+
+//    javadoc {
+//        include "**/org/apache/kafka/clients/producer/*"
+//        include "**/org/apache/kafka/common/errors/*"
+//    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 109fc96..203a391 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -34,7 +34,9 @@ public enum ApiKeys {
     OFFSET_FETCH(9, "offset_fetch"),
     CONSUMER_METADATA(10, "consumer_metadata"),
     JOIN_GROUP(11, "join_group"),
-    HEARTBEAT(12, "heartbeat");
+    HEARTBEAT(12, "heartbeat"),
+    ADMIN(13, "admin"),
+    CLUSTER_METADATA(14, "cluster_metadata");
 
     private static ApiKeys[] codeToType;
     public static int MAX_API_KEY = -1;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 7517b87..f7de022 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.protocol;
 
 import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.INT8;
 import static org.apache.kafka.common.protocol.types.Type.INT16;
 import static org.apache.kafka.common.protocol.types.Type.INT32;
 import static org.apache.kafka.common.protocol.types.Type.INT64;
@@ -362,6 +363,41 @@ public class Protocol {
     public static Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
     public static Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
 
+    /* Admin api */
+    public static Schema ADMIN_REQUEST_V0 = new Schema(new Field("utility",
+                                                                INT8,
+                                                                "A target subtype of the request."),
+                                                        new Field("command",
+                                                                INT8,
+                                                                "A request command subtype."),
+                                                        new Field("args",
+                                                                STRING,
+                                                                "Admin command arguments."));
+
+    public static Schema ADMIN_RESPONSE_V0 = new Schema(new Field("error_code",
+                                                                INT16),
+                                                        new Field("outcome",
+                                                                STRING,
+                                                                "Optional command response, or error description in case of exception"));
+
+    /* Cluster metadata api */
+    public static Schema CLUSTER_METADATA_REQUEST_V0 = new Schema();
+
+    public static Schema CLUSTER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code",
+                                                                            INT16),
+                                                                    new Field("brokers",
+                                                                            new ArrayOf(BROKER),
+                                                                            "Host and port information for all brokers."),
+                                                                    new Field("controller",
+                                                                            BROKER,
+                                                                            "Host and port information for a broker with controller's role"));
+
+    public static Schema[] ADMIN_REQUEST = new Schema[] {ADMIN_REQUEST_V0};
+    public static Schema[] ADMIN_RESPONSE = new Schema[] {ADMIN_RESPONSE_V0};
+
+    public static Schema[] CLUSTER_METADATA_REQUEST = new Schema[] {CLUSTER_METADATA_REQUEST_V0};
+    public static Schema[] CLUSTER_METADATA_RESPONSE = new Schema[] {CLUSTER_METADATA_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions */
     public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
     public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -381,6 +417,8 @@ public class Protocol {
         REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
         REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
         REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
+        REQUESTS[ApiKeys.ADMIN.id] = ADMIN_REQUEST;
+        REQUESTS[ApiKeys.CLUSTER_METADATA.id] = CLUSTER_METADATA_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -393,6 +431,8 @@ public class Protocol {
         RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;
         RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
         RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
+        RESPONSES[ApiKeys.ADMIN.id] = ADMIN_RESPONSE;
+        RESPONSES[ApiKeys.CLUSTER_METADATA.id] = CLUSTER_METADATA_RESPONSE;
 
         /* set the maximum version of each api */
         for (ApiKeys api : ApiKeys.values())
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AdminRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AdminRequest.java
new file mode 100644
index 0000000..c47a467
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AdminRequest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class AdminRequest extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.ADMIN.id);
+    private static String UTILITY_KEY_NAME = "utility";
+    private static String COMMAND_KEY_NAME = "command";
+    private static String ARGS_KEY_NAME = "args";
+
+    private final byte utility;
+    private final byte command;
+    private final String args;
+
+    public AdminRequest(byte utility, byte command, String args) {
+        super(new Struct(curSchema));
+
+        struct.set(UTILITY_KEY_NAME, utility);
+        struct.set(COMMAND_KEY_NAME, command);
+        struct.set(ARGS_KEY_NAME, args);
+        this.utility = utility;
+        this.command = command;
+        this.args = args;
+    }
+
+    public AdminRequest(Struct struct) {
+        super(struct);
+        utility = (Byte) struct.get(UTILITY_KEY_NAME);
+        command = (Byte) struct.get(COMMAND_KEY_NAME);
+        args = struct.getString(ARGS_KEY_NAME);
+    }
+
+    public byte utility() {
+        return utility;
+    }
+
+    public byte command() {
+        return command;
+    }
+
+    public String args() {
+        return args;
+    }
+
+    public static AdminRequest parse(ByteBuffer buffer) {
+        return new AdminRequest(((Struct) curSchema.read(buffer)));
+    }
+}
+
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AdminResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AdminResponse.java
new file mode 100644
index 0000000..f801a61
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AdminResponse.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class AdminResponse extends AbstractRequestResponse {
+    private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.ADMIN.id);
+    private static String ERROR_CODE_KEY_NAME = "error_code";
+    private static String OUTCOME_KEY_NAME = "outcome";
+
+    private final short errorCode;
+    private final String outcome;
+
+    public AdminResponse(String outcome, short errorCode) {
+        super(new Struct(curSchema));
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        struct.set(OUTCOME_KEY_NAME, outcome);
+        this.errorCode = errorCode;
+        this.outcome = outcome;
+    }
+
+    public AdminResponse(Struct struct) {
+        super(struct);
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        outcome = struct.getString(OUTCOME_KEY_NAME);
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public String outcome() {
+        return outcome;
+    }
+
+    public static AdminResponse parse(ByteBuffer buffer) {
+        return new AdminResponse(((Struct) curSchema.read(buffer)));
+    }
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataRequest.java
new file mode 100644
index 0000000..bc03cc4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataRequest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class ClusterMetadataRequest extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.CLUSTER_METADATA.id);
+
+    public ClusterMetadataRequest() {
+        super(new Struct(curSchema));
+    }
+
+    public ClusterMetadataRequest(Struct struct) {
+        super(struct);
+    }
+
+    public static ClusterMetadataRequest parse(ByteBuffer buffer) {
+        return new ClusterMetadataRequest(((Struct) curSchema.read(buffer)));
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataResponse.java
new file mode 100644
index 0000000..a2be567
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ClusterMetadataResponse.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+public class ClusterMetadataResponse extends AbstractRequestResponse {
+    private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.CLUSTER_METADATA.id);
+    private static String ERROR_CODE_KEY_NAME = "error_code";
+    private static String BROKERS_KEY_NAME = "brokers";
+    private static String CONTROLLER_KEY_NAME = "controller";
+
+    // broker level field names
+    private static String NODE_ID_KEY_NAME = "node_id";
+    private static String HOST_KEY_NAME = "host";
+    private static String PORT_KEY_NAME = "port";
+
+    private final short errorCode;
+    private List<Node> brokers;
+    private Node controller;
+
+    public ClusterMetadataResponse(short errorCode, List<Node> brokers, Node controller) {
+        super(new Struct(curSchema));
+
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        List<Struct> brokerArray = new ArrayList<Struct>();
+        for (Node node : brokers) {
+            Struct broker = struct.instance(BROKERS_KEY_NAME);
+            broker.set(NODE_ID_KEY_NAME, node.id());
+            broker.set(HOST_KEY_NAME, node.host());
+            broker.set(PORT_KEY_NAME, node.port());
+            brokerArray.add(broker);
+        }
+        struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
+
+        Struct coordinator = struct.instance(CONTROLLER_KEY_NAME);
+        coordinator.set(NODE_ID_KEY_NAME, controller.id());
+        coordinator.set(HOST_KEY_NAME, controller.host());
+        coordinator.set(PORT_KEY_NAME, controller.port());
+        struct.set(CONTROLLER_KEY_NAME, coordinator);
+
+        this.errorCode = errorCode;
+        this.brokers = brokers;
+        this.controller = controller;
+    }
+
+    public ClusterMetadataResponse(Struct struct) {
+        super(struct);
+        Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME);
+        List<Node> brokersList = new LinkedList<Node>();
+        for (int i = 0; i < brokerStructs.length; i++) {
+            Struct broker = (Struct) brokerStructs[i];
+            int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+            String host = broker.getString(HOST_KEY_NAME);
+            int port = broker.getInt(PORT_KEY_NAME);
+            brokersList.add(new Node(nodeId, host, port));
+        }
+        brokers = brokersList;
+
+        Struct broker = (Struct) struct.get(CONTROLLER_KEY_NAME);
+        int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+        String host = broker.getString(HOST_KEY_NAME);
+        int port = broker.getInt(PORT_KEY_NAME);
+        controller = new Node(nodeId, host, port);
+
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public List<Node> brokers() {
+        return brokers;
+    }
+
+    public Node controller() {
+        return controller;
+    }
+
+    public static ClusterMetadataResponse parse(ByteBuffer buffer) {
+        return new ClusterMetadataResponse(((Struct) curSchema.read(buffer)));
+    }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index df37fc6..d838ab1 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -52,7 +52,11 @@ public class RequestResponseTest {
                 createOffsetFetchRequest(),
                 createOffsetFetchResponse(),
                 createProduceRequest(),
-                createProduceResponse());
+                createProduceResponse(),
+                createAdminRequest(),
+                createAdminResponse(),
+                createClusterMetadataRequest(),
+                createClusterMetadataResponse());
 
         for (AbstractRequestResponse req: requestList) {
             ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf());
@@ -170,4 +174,22 @@ public class RequestResponseTest {
         responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse((short) 0, 10000));
         return new ProduceResponse(responseData);
     }
+
+    private AbstractRequestResponse createAdminRequest(){
+        return new AdminRequest((byte)1, (byte)2, "{\"topic\":\"my_topic\"}");
+    }
+
+    private AbstractRequestResponse createAdminResponse(){
+        return new AdminResponse("Parse exception.", (short)25);
+    }
+
+    private AbstractRequestResponse createClusterMetadataRequest(){
+        return new ClusterMetadataRequest();
+    }
+
+    private AbstractRequestResponse createClusterMetadataResponse(){
+        return new ClusterMetadataResponse((short) 25,
+                                            Arrays.asList(new Node(1, "127.0.0.1", 9092), new Node(2, "127.0.0.1", 9093)),
+                                            new Node(1, "127.0.0.1", 9092));
+    }
 }
diff --git a/core/src/main/scala/kafka/api/AdminRequest.scala b/core/src/main/scala/kafka/api/AdminRequest.scala
new file mode 100644
index 0000000..f695ebf
--- /dev/null
+++ b/core/src/main/scala/kafka/api/AdminRequest.scala
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
+import kafka.common.ErrorMapping
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.network.RequestChannel.Response
+
+object AdminRequest {
+
+  object Utility {
+    val Broker = 0
+    val Topic = 1
+    val Replication = 2
+    val Controller = 3
+    val Consumer = 4
+    val Producer = 5
+  }
+
+  object Commands {
+
+    object Topic {
+      val Create = 0
+      val Alter = 1
+      val Delete = 2
+      val List = 3
+      val Describe = 4
+    }
+
+  }
+
+  def readFrom(buffer: ByteBuffer) = {
+    val versionId: Short = buffer.getShort
+    val correlationId: Int = buffer.getInt
+    val clientId: String = readShortString(buffer)
+    val utility = buffer.get
+    val command = buffer.get
+    val args = ApiUtils.readShortString(buffer)
+    AdminRequest(versionId, correlationId, clientId, utility, command, args)
+  }
+}
+
+case class AdminRequest(versionId: Short = ProducerRequest.CurrentVersion,
+                        correlationId: Int,
+                        clientId: String,
+                        utility: Byte,
+                        command: Byte,
+                        args: String)
+  extends RequestOrResponse(Some(RequestKeys.AdminKey)) {
+
+  def sizeInBytes =
+      2 + /* versionId */
+      4 + /* correlationId */
+      shortStringLength(clientId) + /* client id */
+      1 + /* utility */
+      1 + /* command */
+      ApiUtils.shortStringLength(args)
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    writeShortString(buffer, clientId)
+    buffer.put(utility)
+    buffer.put(command)
+    ApiUtils.writeShortString(buffer, args)
+  }
+
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    val errorResponse = AdminResponse(e.getLocalizedMessage, ErrorMapping.AdminRequestFailedCode, correlationId)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
+
+  def describe(details: Boolean) = {
+    val adminRequest = new StringBuilder
+    adminRequest.append("Name: " + this.getClass.getSimpleName)
+    adminRequest.append("; Version: " + versionId)
+    adminRequest.append("; CorrelationId: " + correlationId)
+    adminRequest.append("; ClientId: " + clientId)
+    adminRequest.append("; Utility: " + utility)
+    adminRequest.append("; Command: " + command)
+    if (details)
+      adminRequest.append("; Args: " + args)
+
+    adminRequest.toString()
+  }
+}
diff --git a/core/src/main/scala/kafka/api/AdminResponse.scala b/core/src/main/scala/kafka/api/AdminResponse.scala
new file mode 100644
index 0000000..8895031
--- /dev/null
+++ b/core/src/main/scala/kafka/api/AdminResponse.scala
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+object AdminResponse {
+  val NoOutcome = ""
+
+  def readFrom(buffer: ByteBuffer) = {
+    val correlationId = buffer.getInt
+    val errorCode = buffer.getShort
+    val outcome = ApiUtils.readShortString(buffer)
+
+    AdminResponse(outcome, errorCode, correlationId)
+  }
+}
+
+case class AdminResponse(outcome: String, errorCode: Short, correlationId: Int = 0)
+  extends RequestOrResponse() {
+
+  def sizeInBytes =
+      4 + /* correlationId */
+      2 + /* error code */
+      ApiUtils.shortStringLength(outcome)
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(correlationId)
+    buffer.putShort(errorCode)
+    ApiUtils.writeShortString(buffer, outcome)
+  }
+
+  def describe(details: Boolean) = toString
+}
diff --git a/core/src/main/scala/kafka/api/ClusterMetadataRequest.scala b/core/src/main/scala/kafka/api/ClusterMetadataRequest.scala
new file mode 100644
index 0000000..53d4b25
--- /dev/null
+++ b/core/src/main/scala/kafka/api/ClusterMetadataRequest.scala
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
+import kafka.cluster.Broker
+import kafka.common.ErrorMapping
+import kafka.network.RequestChannel.Response
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+
+object ClusterMetadataRequest {
+  def readFrom(buffer: ByteBuffer) = {
+    val versionId: Short = buffer.getShort
+    val correlationId: Int = buffer.getInt
+    val clientId: String = readShortString(buffer)
+
+    ClusterMetadataRequest(versionId, correlationId, clientId)
+  }
+}
+
+case class ClusterMetadataRequest(versionId: Short = ProducerRequest.CurrentVersion,
+                                  correlationId: Int,
+                                  clientId: String)
+  extends RequestOrResponse(Some(RequestKeys.AdminKey)) {
+
+  def sizeInBytes =
+    2 + /* versionId */
+      4 + /* correlationId */
+      shortStringLength(clientId) /* client id */
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    writeShortString(buffer, clientId)
+  }
+
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    val errorResponse = ClusterMetadataResponse(Seq.empty[Broker], None,
+      ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]), correlationId)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
+
+  def describe(details: Boolean) = {
+    val adminRequest = new StringBuilder
+    adminRequest.append("Name: " + this.getClass.getSimpleName)
+    adminRequest.append("; Version: " + versionId)
+    adminRequest.append("; CorrelationId: " + correlationId)
+    adminRequest.append("; ClientId: " + clientId)
+
+    adminRequest.toString()
+  }
+}
diff --git a/core/src/main/scala/kafka/api/ClusterMetadataResponse.scala b/core/src/main/scala/kafka/api/ClusterMetadataResponse.scala
new file mode 100644
index 0000000..b6d7d91
--- /dev/null
+++ b/core/src/main/scala/kafka/api/ClusterMetadataResponse.scala
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.cluster.Broker
+import kafka.common.ErrorMapping
+
+object ClusterMetadataResponse {
+
+  private val NoBrokerOpt = Some(Broker(id = -1, host = "", port = -1))
+
+  def readFrom(buffer: ByteBuffer) = {
+    val correlationId = buffer.getInt
+    val errorCode = buffer.getShort
+    val brokerCount = buffer.getInt
+    val brokers = (0 until brokerCount).map(_ => Broker.readFrom(buffer))
+    // we should read off buffer anyway
+    val controller = Broker.readFrom(buffer)
+
+    ClusterMetadataResponse(brokers, if (errorCode == ErrorMapping.NoError) Some(controller) else None, errorCode, correlationId)
+  }
+}
+
+case class ClusterMetadataResponse(brokers: Seq[Broker], controller: Option[Broker], errorCode: Short, correlationId: Int = 0)
+  extends RequestOrResponse() {
+
+  def sizeInBytes =
+    4 + /* correlationId */
+      2 + /* error code */
+      4 + brokers.map(_.sizeInBytes).sum + /* array size + brokers */
+      controller.orElse(ClusterMetadataResponse.NoBrokerOpt).get.sizeInBytes
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(correlationId)
+    buffer.putShort(errorCode)
+    /* brokers */
+    buffer.putInt(brokers.size)
+    brokers.foreach(_.writeTo(buffer))
+    /* controller */
+    controller.orElse(ClusterMetadataResponse.NoBrokerOpt).foreach(_.writeTo(buffer))
+  }
+
+  def describe(details: Boolean) = toString
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index c24c034..f4b975d 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -34,6 +34,8 @@ object RequestKeys {
   val ConsumerMetadataKey: Short = 10
   val JoinGroupKey: Short = 11
   val HeartbeatKey: Short = 12
+  val AdminKey: Short = 13
+  val ClusterMetadataKey: Short = 14
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -48,7 +50,9 @@ object RequestKeys {
         OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
         ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom),
         JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom),
-        HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom)
+        HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom),
+        AdminKey -> ("Admin", AdminRequest.readFrom),
+        ClusterMetadataKey -> ("ClusterMetadata", ClusterMetadataRequest.readFrom)
     )
 
   def nameForKey(key: Short): String = {
diff --git a/core/src/main/scala/kafka/api/admin/request/args/ParseException.scala b/core/src/main/scala/kafka/api/admin/request/args/ParseException.scala
new file mode 100644
index 0000000..a8bdf3e
--- /dev/null
+++ b/core/src/main/scala/kafka/api/admin/request/args/ParseException.scala
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api.admin.request.args
+
+/**
+ * Signals that an error occurred while deserializing [[kafka.api.AdminRequest]] arguments field.
+ */
+class ParseException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
diff --git a/core/src/main/scala/kafka/api/admin/request/args/TopicCommandArguments.scala b/core/src/main/scala/kafka/api/admin/request/args/TopicCommandArguments.scala
new file mode 100644
index 0000000..45269b9
--- /dev/null
+++ b/core/src/main/scala/kafka/api/admin/request/args/TopicCommandArguments.scala
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api.admin.request.args
+
+import kafka.utils.Json
+
+import scala.collection.Map
+
+object TopicCommandArguments {
+
+  trait FromJsonDeserializable[T] {
+    def deserialize(json: String): T = {
+      try {
+        Json.parseFull(json) match {
+          case Some(map: Map[String, _]) => parseFieldsMap(map)
+          case _ => throw new ParseException("Failed to deserialize json to " + className)
+        }
+      } catch {
+        case ae: ParseException => throw ae
+        case e: Exception =>
+          throw new ParseException("Failed to deserialize json to %s. Reason: %s".format(className, e.toString))
+      }
+    }
+
+    def parseFieldsMap(map: Map[String, _]): T
+
+    val className: String
+  }
+
+  object CreateTopicParams extends FromJsonDeserializable[CreateTopicParams] {
+
+    def parseFieldsMap(map: Map[String, _]): CreateTopicParams = {
+      val topic = map.get("topic").map(_.asInstanceOf[String])
+        .getOrElse(throw new ParseException("topic is mandatory element"))
+      val partitionsOpt = map.get("partitions").map(_.asInstanceOf[String].toInt)
+      val replicasOpt = map.get("replicas").map(_.asInstanceOf[String].toInt)
+      val replicaAssignmentOpt = map.get("replicaAssignment").map(_.asInstanceOf[String])
+      val configs = map.get("configs").map(_.asInstanceOf[List[String]])
+      CreateTopicParams(topic, partitionsOpt, replicasOpt, replicaAssignmentOpt, configs)
+    }
+
+    override val className = this.getClass.getSimpleName
+  }
+
+  case class CreateTopicParams(topic: String, partitions: Option[Int], replicas: Option[Int],
+                               replicaAssignment: Option[String], configs: Option[List[String]])
+
+  object AlterTopicParams extends FromJsonDeserializable[AlterTopicParams] {
+    def parseFieldsMap(map: Map[String, _]): AlterTopicParams = {
+      val topic = map.get("topic").map(_.asInstanceOf[String])
+        .getOrElse(throw new ParseException("topic is mandatory element"))
+      val addedConfigsOpt = map.get("addedConfigs").map(_.asInstanceOf[List[String]])
+      val deletedConfigsOpt = map.get("deletedConfigs").map(_.asInstanceOf[List[String]])
+      val partitionsOpt = map.get("partitions").map(_.asInstanceOf[String].toInt)
+      val replicaAssignmentOpt = map.get("replicaAssignment").map(_.asInstanceOf[String])
+
+      AlterTopicParams(topic, addedConfigsOpt, deletedConfigsOpt, partitionsOpt, replicaAssignmentOpt)
+    }
+
+    override val className = this.getClass.getSimpleName
+  }
+
+  case class AlterTopicParams(topic: String,
+                              addedConfigs: Option[List[String]],
+                              deletedConfigs: Option[List[String]],
+                              partitions: Option[Int],
+                              replicaAssignment: Option[String])
+
+  object DescribeTopicParams extends FromJsonDeserializable[DescribeTopicParams] {
+    def parseFieldsMap(map: Map[String, _]): DescribeTopicParams = {
+      val topic = map.get("topic").map(_.asInstanceOf[String])
+        .getOrElse(throw new ParseException("topic is mandatory element"))
+      val reportUnderReplicatedPartitions = map.get("reportUnderReplicatedPartitions").exists(_.asInstanceOf[String].toBoolean)
+      val reportUnavailablePartitions = map.get("reportUnavailablePartitions").exists(_.asInstanceOf[String].toBoolean)
+      val reportOverriddenConfigs = map.get("reportOverriddenConfigs").exists(_.asInstanceOf[String].toBoolean)
+
+      DescribeTopicParams(topic, reportUnderReplicatedPartitions, reportUnavailablePartitions, reportOverriddenConfigs)
+    }
+
+    override val className = this.getClass.getSimpleName
+  }
+
+  case class DescribeTopicParams(topic: String,
+                             reportUnderReplicatedPartitions: Boolean,
+                             reportUnavailablePartitions: Boolean,
+                             reportOverriddenConfigs: Boolean)
+
+  object DeleteTopicParams extends FromJsonDeserializable[DeleteTopicParams] {
+    def parseFieldsMap(map: Map[String, _]): DeleteTopicParams = {
+      val topic = map.get("topic").map(_.asInstanceOf[String])
+        .getOrElse(throw new ParseException("topic is mandatory element"))
+
+      DeleteTopicParams(topic)
+    }
+
+    override val className = this.getClass.getSimpleName
+  }
+
+  case class DeleteTopicParams(topic: String)
+
+  object ListTopicsParam extends FromJsonDeserializable[ListTopicsParam] {
+    def parseFieldsMap(map: Map[String, _]): ListTopicsParam = {
+      val topicOpt = map.get("topic").map(_.asInstanceOf[String])
+
+      ListTopicsParam(topicOpt)
+    }
+
+    override val className = this.getClass.getSimpleName
+  }
+
+  case class ListTopicsParam(topicOpt: Option[String])
+}
diff --git a/core/src/main/scala/kafka/common/AdminRequestFailedException.scala b/core/src/main/scala/kafka/common/AdminRequestFailedException.scala
new file mode 100644
index 0000000..00b8180
--- /dev/null
+++ b/core/src/main/scala/kafka/common/AdminRequestFailedException.scala
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * A base exception for unexpected errors happening during processing [[kafka.api.AdminRequest]].
+ */
+class AdminRequestFailedException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
+
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index eedc2f5..822edbd 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -49,6 +49,8 @@ object ErrorMapping {
   val MessageSetSizeTooLargeCode: Short = 18
   val NotEnoughReplicasCode : Short = 19
   val NotEnoughReplicasAfterAppendCode: Short = 20
+  val AdminRequestFailedCode: Short = 21
+  val InvalidRequestTargetCode: Short = 22
 
   private val exceptionToCode =
     Map[Class[Throwable], Short](
@@ -70,7 +72,9 @@ object ErrorMapping {
       classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode,
       classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode,
       classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode,
-      classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode
+      classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode,
+      classOf[AdminRequestFailedException].asInstanceOf[Class[Throwable]] -> AdminRequestFailedCode,
+      classOf[InvalidRequestTargetException].asInstanceOf[Class[Throwable]] -> InvalidRequestTargetCode
     ).withDefaultValue(UnknownCode)
 
   /* invert the mapping */
diff --git a/core/src/main/scala/kafka/common/InvalidRequestTargetException.scala b/core/src/main/scala/kafka/common/InvalidRequestTargetException.scala
new file mode 100644
index 0000000..17035f6
--- /dev/null
+++ b/core/src/main/scala/kafka/common/InvalidRequestTargetException.scala
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.common
+
+/**
+ * Signals that this broker cannot serve [[kafka.server.KafkaApis]]'s request
+ */
+class InvalidRequestTargetException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index eb492f0..27cca80 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -271,7 +271,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
     controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true))
   }
 
-  def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) {
+  def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int, forceSendBrokerInfo: Boolean = false) {
     leaderAndIsrRequestMap.foreach { m =>
       val broker = m._1
       val partitionStateInfos = m._2.toMap
@@ -298,6 +298,17 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
         correlationId, broker, p._1)))
       controller.sendRequest(broker, updateMetadataRequest, null)
     }
+    if (forceSendBrokerInfo) {
+      val brokers = controllerContext.liveOrShuttingDownBrokers
+      val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, correlationId, clientId,
+        Map.empty[TopicAndPartition, PartitionStateInfo], brokers)
+
+      stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " +
+        "correlationId %d to brokers %s").format(controllerId, controllerEpoch, updateMetadataRequest,
+          correlationId, brokers))
+      brokers.foreach(broker => controller.sendRequest(broker.id, updateMetadataRequest, null))
+    }
+
     updateMetadataRequestMap.clear()
     stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
       val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 66df6d2..b716858 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -319,7 +319,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
       maybeTriggerPartitionReassignment()
       maybeTriggerPreferredReplicaElection()
       /* send partition leadership info to all live brokers */
-      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, forceSendBrokerInfo = true)
       if (config.autoLeaderRebalanceEnable) {
         info("starting the partition rebalance scheduler")
         autoRebalanceScheduler.startup()
@@ -396,7 +396,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
     // send update metadata request for all partitions to the newly restarted brokers. In cases of controlled shutdown
     // leaders will not be elected when a new broker comes up. So at least in the common controlled shutdown case, the
     // metadata will reach the new brokers faster
-    sendUpdateMetadataRequest(newBrokers)
+    sendUpdateMetadataRequest(newBrokers, forceSendBrokerInfo = true)
     // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
     // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
     val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
@@ -970,10 +970,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
    * metadata requests
    * @param brokers The brokers that the update metadata request should be sent to
    */
-  def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
+  def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition], forceSendBrokerInfo: Boolean = false) {
     brokerRequestBatch.newBatch()
     brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
-    brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
+    brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, forceSendBrokerInfo)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2a1c032..2cc8811 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,6 +18,8 @@
 package kafka.server
 
 import kafka.api._
+import kafka.api.admin.request.args.TopicCommandArguments._
+import kafka.cluster.Broker
 import kafka.common._
 import kafka.log._
 import kafka.network._
@@ -62,6 +64,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
         case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
         case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
+        case RequestKeys.AdminKey => handleAdminRequest(request)
+        case RequestKeys.ClusterMetadataKey => handleClusterMetadataRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -431,6 +435,101 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
+  /*
+   * Handle an admin request
+   */
+  def handleAdminRequest(request: RequestChannel.Request) {
+    import AdminRequest.{Utility, Commands}
+    val adminRequest = request.requestObj.asInstanceOf[AdminRequest]
+
+    if (!controller.isActive()) {
+      val response = AdminResponse("Target broker (id=%d) is not serving a controller's role.".format(brokerId),
+        ErrorMapping.InvalidRequestTargetCode, adminRequest.correlationId)
+      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+    } else {
+      adminRequest.utility match {
+        case Utility.Topic =>
+          val adminResponse =
+            adminRequest.command match {
+              case Commands.Topic.Create => handleAdminTopicCreateCommand(adminRequest)
+              case Commands.Topic.Alter => handleAdminTopicAlterCommand(adminRequest)
+              case Commands.Topic.Delete => handleAdminTopicDeleteCommand(adminRequest)
+              case Commands.Topic.List => handleAdminTopicListCommand(adminRequest)
+              case Commands.Topic.Describe => handleAdminTopicDescribeCommand(adminRequest)
+              case c => throw new AdminRequestFailedException("Unknown command %d for utility Topic (%d)"
+                .format(adminRequest.command, adminRequest.utility))
+            }
+          trace("Sending Admin (Utility %d, Command %d) response %s for correlation id %d to client %s."
+            .format(adminRequest.utility, adminRequest.command, adminResponse, adminRequest.correlationId, adminRequest.clientId))
+          requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(adminResponse)))
+        case _ =>
+      }
+    }
+  }
+
+  private def successfulEmptyAdminResponse(correlationId: Int) =
+    AdminResponse(AdminResponse.NoOutcome, ErrorMapping.NoError, correlationId)
+
+  private def handleAdminTopicCreateCommand(adminRequest: AdminRequest): AdminResponse = {
+    val args = CreateTopicParams.deserialize(adminRequest.args)
+    TopicCommandHelper.createTopic(zkClient, args.topic, args.configs.getOrElse(Nil),
+      args.replicaAssignment, args.partitions, args.replicas)
+
+    successfulEmptyAdminResponse(adminRequest.correlationId)
+  }
+
+  private def handleAdminTopicAlterCommand(adminRequest: AdminRequest): AdminResponse = {
+    val args = AlterTopicParams.deserialize(adminRequest.args)
+    TopicCommandHelper.alterTopic(zkClient, args.topic, args.addedConfigs.getOrElse(Nil), args.deletedConfigs.getOrElse(Nil), args.partitions, args.replicaAssignment)
+
+    successfulEmptyAdminResponse(adminRequest.correlationId)
+  }
+
+  private def handleAdminTopicDeleteCommand(adminRequest: AdminRequest): AdminResponse = {
+    val args = DeleteTopicParams.deserialize(adminRequest.args)
+    TopicCommandHelper.deleteTopic(zkClient, args.topic)
+
+    successfulEmptyAdminResponse(adminRequest.correlationId)
+  }
+
+  private def handleAdminTopicListCommand(adminRequest: AdminRequest): AdminResponse = {
+    val args = ListTopicsParam.deserialize(adminRequest.args)
+    val topicsList =
+      TopicCommandHelper.listTopics(zkClient, args.topicOpt)
+
+    // TODO for now a simple toString instead of JSON until we decide the format
+    AdminResponse(topicsList.toString(), ErrorMapping.NoError, adminRequest.correlationId)
+  }
+
+  private def handleAdminTopicDescribeCommand(adminRequest: AdminRequest): AdminResponse = {
+    val args = DescribeTopicParams.deserialize(adminRequest.args)
+    val topicDescription =
+      TopicCommandHelper.describeTopic(zkClient, args.topic, args.reportUnderReplicatedPartitions,
+        args.reportUnavailablePartitions, args.reportOverriddenConfigs)
+
+    // TODO for now a simple toString instead of JSON until we decide the format
+    AdminResponse(topicDescription.toString, ErrorMapping.NoError, adminRequest.correlationId)
+  }
+
+  /*
+   * Handle a cluster metadata request
+   */
+  def handleClusterMetadataRequest(request: RequestChannel.Request) {
+    val clusterMetadataRequest = request.requestObj.asInstanceOf[ClusterMetadataRequest]
+    val (brokers, controllerOpt) = metadataCache.getBrokersAndController
+    val errorResponse = ClusterMetadataResponse(Seq.empty[Broker], None, ErrorMapping.UnknownCode, clusterMetadataRequest.correlationId)
+    val response =
+      controllerOpt match {
+        case Some(_) => ClusterMetadataResponse(brokers, controllerOpt, ErrorMapping.NoError, clusterMetadataRequest.correlationId)
+        case None => errorResponse
+      }
+
+    trace("Sending cluster metadata %s for correlation id %d to client %s."
+      .format(response, clusterMetadataRequest.correlationId, clusterMetadataRequest.clientId))
+
+    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+  }
+
   def close() {
     // TODO currently closing the API is an no-op since the API no longer maintain any modules
     // maybe removing the closing call in the end when KafkaAPI becomes a pure stateless layer
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index bf81a1a..beabb5d 100644
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -34,6 +34,7 @@ private[server] class MetadataCache {
   private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
     new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
   private var aliveBrokers: Map[Int, Broker] = Map()
+  private var controller: Option[Broker] = None
   private val partitionMetadataLock = new ReentrantReadWriteLock()
 
   def getTopicMetadata(topics: Set[String]) = {
@@ -86,6 +87,12 @@ private[server] class MetadataCache {
     }
   }
 
+  def getBrokersAndController: (Seq[Broker], Option[Broker]) = {
+    inReadLock(partitionMetadataLock) {
+      (aliveBrokers.values.toSeq, controller)
+    }
+  }
+
   def addOrUpdatePartitionInfo(topic: String,
                                partitionId: Int,
                                stateInfo: PartitionStateInfo) {
@@ -115,6 +122,7 @@ private[server] class MetadataCache {
                   stateChangeLogger: StateChangeLogger) {
     inWriteLock(partitionMetadataLock) {
       aliveBrokers = updateMetadataRequest.aliveBrokers.map(b => (b.id, b)).toMap
+      controller = aliveBrokers.get(updateMetadataRequest.controllerId)
       updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) =>
         if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) {
           removePartitionInfo(tp.topic, tp.partition)
diff --git a/core/src/main/scala/kafka/server/TopicCommandHelper.scala b/core/src/main/scala/kafka/server/TopicCommandHelper.scala
new file mode 100644
index 0000000..c7da35a
--- /dev/null
+++ b/core/src/main/scala/kafka/server/TopicCommandHelper.scala
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.Properties
+
+import kafka.admin.{AdminOperationException, AdminUtils}
+import kafka.cluster.Broker
+import kafka.common.AdminCommandFailedException
+import kafka.consumer.Whitelist
+import kafka.log.LogConfig
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import org.apache.kafka.common.utils.Utils.formatAddress
+
+import scala.collection.JavaConversions._
+import scala.collection._
+
+object TopicCommandHelper {
+
+  private def getTopics(zkClient: ZkClient, topicOpt: Option[String]): List[String] = {
+    val allTopics = ZkUtils.getAllTopics(zkClient).sorted.toList
+
+    topicOpt match {
+      case Some(topic) =>
+        val topicsFilter = new Whitelist(topic)
+        allTopics.filter(topicsFilter.isTopicAllowed(_, excludeInternalTopics = false))
+      case None =>
+        allTopics
+    }
+  }
+
+  def createTopic(zkClient: ZkClient,
+                  topic: String,
+                  topicConfigs: List[String],
+                  replicaAssignmentOpt: Option[String],
+                  partitionsOpt: Option[Int],
+                  replicasOpt: Option[Int]) {
+
+    val configs = parseTopicConfigsToBeAdded(topicConfigs)
+    replicaAssignmentOpt match {
+      case Some(replicaAssignment) =>
+        val assignment = parseReplicaAssignment(replicaAssignment)
+        AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs)
+      case None =>
+        (partitionsOpt, replicasOpt) match {
+          case Pair(Some(partitions), Some(replicas)) => AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
+          case _ => throw new IllegalArgumentException("Either replicaAssignment or (partitions, replicas) must be defined")
+        }
+    }
+  }
+
+  def alterTopic(zkClient: ZkClient,
+                 topic: String,
+                 addConfigs: List[String],
+                 deleteConfigs: List[String],
+                 partitionsOpt: Option[Int],
+                 replicaAssignmentOpt: Option[String]) {
+
+    val topics = getTopics(zkClient, Some(topic))
+    if (topics.isEmpty) throw new IllegalArgumentException("Topic %s does not exist".format(topic))
+
+    topics.foreach { t =>
+      val configs = AdminUtils.fetchTopicConfig(zkClient, t)
+
+      if (addConfigs.nonEmpty || deleteConfigs.nonEmpty) {
+        configs.putAll(parseTopicConfigsToBeAdded(addConfigs))
+        parseTopicConfigsToBeDeleted(deleteConfigs).foreach(configs.remove)
+
+        AdminUtils.changeTopicConfig(zkClient, t, configs)
+      }
+
+      partitionsOpt.foreach {
+        nPartitions =>
+          if (t == OffsetManager.OffsetsTopicName) {
+            throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
+          }
+          AdminUtils.addPartitions(zkClient, t, nPartitions, replicaAssignmentOpt.getOrElse(""), config = configs)
+      }
+    }
+  }
+
+  /**
+   * @param topicOpt optionally specified topic which should be checked for existence
+   * @return List with topic marked for deletion and List with ordinary (others) topics
+   */
+  def listTopics(zkClient: ZkClient, topicOpt: Option[String]): (List[String], List[String]) = {
+    getTopics(zkClient, topicOpt)
+      .partition(topic => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)))
+  }
+
+  def deleteTopic(zkClient: ZkClient, topic: String) {
+    val topics = getTopics(zkClient, Some(topic))
+    if (topics.isEmpty) throw new IllegalArgumentException("Topic %s does not exist".format(topic))
+
+    topics.foreach { t =>
+      try {
+        ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(t))
+      } catch {
+        case e: ZkNodeExistsException =>
+          throw new IllegalStateException("Topic %s is already marked for deletion.".format(t))
+        case e2: Throwable =>
+          throw new AdminOperationException("Error while deleting topic %s".format(t))
+      }
+    }
+  }
+
+  case class TopicDescription(topicName: String,
+                              configDetails: Option[TopicConfigDetails],
+                              partitionsDetails: Option[TopicPartitionsDetails])
+
+  case class TopicConfigDetails(partitionCount: Int,
+                                replicationFactor: Int,
+                                configs: String)
+
+  case class TopicPartitionsDetails(partitionId: Int,
+                                    leader: Option[Int],
+                                    replicas: Seq[Int],
+                                    isr: Seq[Int])
+
+  def describeTopic(zkClient: ZkClient,
+                    topic: String,
+                    reportUnderReplicatedPartitions: Boolean,
+                    reportUnavailablePartitions: Boolean,
+                    reportOverriddenConfigs: Boolean): Option[TopicDescription] = {
+    val topicOpt = getTopics(zkClient, Some(topic)).headOption
+    val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet
+    var configDetails: Option[TopicConfigDetails] = None
+    var partitionsDetails: Option[TopicPartitionsDetails] = None
+    topicOpt.flatMap {
+      t =>
+        ZkUtils.getPartitionAssignmentForTopics(zkClient, List(t)).get(t) map {
+          topicPartitionAssignment =>
+            val describeConfigs = !reportUnavailablePartitions && !reportUnderReplicatedPartitions
+            val describePartitions = !reportOverriddenConfigs
+            val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
+            if (describeConfigs) {
+              val configs = AdminUtils.fetchTopicConfig(zkClient, t)
+              if (!reportOverriddenConfigs || configs.size() != 0) {
+                val numPartitions = topicPartitionAssignment.size
+                val replicationFactor = topicPartitionAssignment.head._2.size
+                configDetails = Some(TopicConfigDetails(numPartitions, replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
+              }
+            }
+            if (describePartitions) {
+              for ((partitionId, assignedReplicas) <- sortedPartitions) {
+                val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, t, partitionId)
+                val leader = ZkUtils.getLeaderForPartition(zkClient, t, partitionId)
+                if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
+                  (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
+                  (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
+                  partitionsDetails = Some(TopicPartitionsDetails(partitionId, leader, assignedReplicas, inSyncReplicas))
+                }
+              }
+            }
+            TopicDescription(t, configDetails, partitionsDetails)
+        }
+    }
+  }
+
+  def formatBroker(broker: Broker) = broker.id + " (" + formatAddress(broker.host, broker.port) + ")"
+
+  def parseTopicConfigsToBeAdded(config: List[String]): Properties = {
+    val configsToBeAdded = config.map(_.split( """\s*=\s*"""))
+    require(configsToBeAdded.forall(config => config.length == 2),
+      "Invalid topic config: all configs to be added must be in the format \"key=val\".")
+    val props = new Properties
+    configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
+    LogConfig.validate(props)
+    props
+  }
+
+  def parseTopicConfigsToBeDeleted(config: List[String]): Seq[String] = {
+    val configsToBeDeleted = config.map(_.trim)
+    val propsToBeDeleted = new Properties
+    configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
+    LogConfig.validateNames(propsToBeDeleted)
+    configsToBeDeleted
+  }
+
+  def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
+    val partitionList = replicaAssignmentList.split(",")
+    val ret = new mutable.HashMap[Int, List[Int]]()
+    for (i <- 0 until partitionList.size) {
+      val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
+      val duplicateBrokers = Utils.duplicates(brokerList)
+      if (duplicateBrokers.nonEmpty)
+        throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicateBrokers.mkString(",")))
+      ret.put(i, brokerList.toList)
+      if (ret(i).size != ret(0).size)
+        throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList)
+    }
+    ret.toMap
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/api/AdminRequestTest.scala b/core/src/test/scala/unit/kafka/api/AdminRequestTest.scala
new file mode 100644
index 0000000..5d32968
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/api/AdminRequestTest.scala
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.api
+
+import junit.framework.Assert._
+import kafka.api.admin.request.args.ParseException
+import kafka.api.admin.request.args.TopicCommandArguments._
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+
+class AdminRequestTest extends JUnitSuite {
+
+  @Test
+  def testCreateTopicParamsNoReplicaAssignment() {
+    val input = "{\"topic\" : \"abc\", " +
+              "\"partitions\" : \"1\", " +
+              "\"replicas\" : \"3\", " +
+              "\"configs\" : [\"key1:value1\",\"key2:value2\"]}"
+
+    val params = CreateTopicParams.deserialize(input)
+    assertEquals("abc", params.topic)
+    assertEquals(Some(1), params.partitions)
+    assertEquals(Some(3), params.replicas)
+    assertEquals(Some(List("key1:value1", "key2:value2")), params.configs)
+    assertEquals(None, params.replicaAssignment)
+  }
+
+  @Test
+  def testCreateTopicParamsWithReplicaAssignment() {
+    val input = "{\"topic\" : \"abc\", " +
+            "\"replicaAssignment\" : \"broker1:broker2:broker3,broker3:broker2:broker1\"}"
+
+    val params = CreateTopicParams.deserialize(input)
+    assertEquals("abc", params.topic)
+    assertEquals(None, params.partitions)
+    assertEquals(None, params.replicas)
+    assertEquals(Some("broker1:broker2:broker3,broker3:broker2:broker1"), params.replicaAssignment)
+  }
+
+  @Test(expected=classOf[ParseException])
+  def testCreateTopicParamsFailure() {
+    val input = "{\"topic\" : \"abc\", " +
+      "\"partitions\" : \"5\", " +
+      "\"replicas\" : \"shouldBeInt\", " +
+      "\"configs\" : [\"key1:value1\",\"key2:value2\"]}"
+
+    CreateTopicParams.deserialize(input)
+  }
+
+  @Test
+  def testAlterTopicParams() {
+    val input = "{\"topic\" : \"abc\", " +
+             "\"addedConfigs\" : [\"key1:value1\",\"key2:value2\"], " +
+             "\"deletedConfigs\" : [\"key3:value3\"], " +
+             "\"partitions\" : \"2\", " +
+             "\"replicaAssignment\" : \"broker1:broker2:broker3,broker3:broker2:broker1\"}"
+
+    val params = AlterTopicParams.deserialize(input)
+    assertEquals("abc", params.topic)
+    assertEquals(Some(List("key1:value1", "key2:value2")), params.addedConfigs)
+    assertEquals(Some(List("key3:value3")), params.deletedConfigs)
+    assertEquals(Some(2), params.partitions)
+    assertEquals(Some("broker1:broker2:broker3,broker3:broker2:broker1"), params.replicaAssignment)
+  }
+
+  @Test
+  def testDeleteTopicParams() {
+    val input = "{\"topic\" : \"abc\"}"
+
+    val params = DeleteTopicParams.deserialize(input)
+    assertEquals("abc", params.topic)
+  }
+
+  @Test
+  def testDescribeTopicParams() {
+    val input = "{\"topic\" : \"abc\", " +
+            "\"reportUnderReplicatedPartitions\" : \"false\", " +
+            "\"reportOverriddenConfigs\" : \"true\"}"
+
+    val params = DescribeTopicParams.deserialize(input)
+    assertEquals("abc", params.topic)
+    assertEquals(false, params.reportUnderReplicatedPartitions)
+    assertEquals(true, params.reportOverriddenConfigs)
+    assertEquals(false, params.reportUnavailablePartitions)
+  }
+
+  @Test
+  def testListTopicsParamsWithTopic() {
+    val input = "{\"topic\" : \"abc\"}"
+
+    val params = ListTopicsParam.deserialize(input)
+    assertEquals(Some("abc"), params.topicOpt)
+  }
+
+  @Test
+  def testListTopicsParamsWithoutTopic() {
+    val input = "{}"
+
+    val params = ListTopicsParam.deserialize(input)
+    assertEquals(None, params.topicOpt)
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index cd16ced..102e343 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -216,6 +216,22 @@ object SerializationTestUtils {
     val body = new JoinGroupResponse(0.asInstanceOf[Short], 1, "consumer1", List(new TopicPartition("test11", 1)))
     JoinGroupResponseAndHeader(1, body)
   }
+
+  def createAdminRequest: AdminRequest = {
+    AdminRequest(0, 1, "client", 2, 3, """{zookeeper:"localhost:2121"}""")
+ }
+
+  def createAdminResponse: AdminResponse = {
+    AdminResponse(AdminResponse.NoOutcome, ErrorMapping.NoError, 123)
+  }
+
+  def createClusterMetadataRequest: ClusterMetadataRequest = {
+    ClusterMetadataRequest(0, 1, "client")
+  }
+
+  def createClusterMetadataResponse: ClusterMetadataResponse = {
+    ClusterMetadataResponse(List(Broker(1, "127.0.0.1", 9092), Broker(2, "127.0.0.1", 9093)), None, -1)
+  }
 }
 
 class RequestResponseSerializationTest extends JUnitSuite {
@@ -242,6 +258,10 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val heartbeatResponse = SerializationTestUtils.createHeartbeatResponseAndHeader
   private val joinGroupRequest = SerializationTestUtils.createJoinGroupRequestAndHeader
   private val joinGroupResponse = SerializationTestUtils.createJoinGroupResponseAndHeader
+  private val adminRequest = SerializationTestUtils.createAdminRequest
+  private val adminResponse = SerializationTestUtils.createAdminResponse
+  private val clusterMetadataRequest = SerializationTestUtils.createClusterMetadataRequest
+  private val clusterMetadataResponse = SerializationTestUtils.createClusterMetadataResponse
 
   @Test
   def testSerializationAndDeserialization() {
@@ -254,7 +274,8 @@ class RequestResponseSerializationTest extends JUnitSuite {
                                offsetCommitResponse, offsetFetchRequest, offsetFetchResponse,
                                consumerMetadataRequest, consumerMetadataResponse,
                                consumerMetadataResponseNoCoordinator, heartbeatRequest,
-                               heartbeatResponse, joinGroupRequest, joinGroupResponse)
+                               heartbeatResponse, joinGroupRequest, joinGroupResponse, adminRequest, adminResponse,
+                               clusterMetadataRequest, clusterMetadataResponse)
 
     requestsAndResponses.foreach { original =>
       val buffer = ByteBuffer.allocate(original.sizeInBytes)
diff --git a/settings.gradle b/settings.gradle
index 83f764e..cc20c1e 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -14,4 +14,4 @@
 // limitations under the License.
 
 apply from: file('scala.gradle')
-include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients'
+include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools'
diff --git a/tools/src/main/java/org/apache/kafka/cli/BaseCommandOpts.java b/tools/src/main/java/org/apache/kafka/cli/BaseCommandOpts.java
new file mode 100644
index 0000000..e62fc9c
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/cli/BaseCommandOpts.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.cli;
+
+import joptsimple.*;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Base Option manager to make jopt usage less verbose.
+ */
+public class BaseCommandOpts {
+
+    protected OptionParser parser;
+
+    public BaseCommandOpts(boolean allowUnrecognizedOptions) {
+        parser = new OptionParser();
+        if (allowUnrecognizedOptions)
+            parser.allowsUnrecognizedOptions();
+    }
+
+    private OptionSet opts = null;
+
+    public void parse(String[] args) {
+        this.opts = parser.parse(args);
+    }
+
+    public boolean has(OptionSpecBuilder opt) {
+        return opts.has(opt);
+    }
+
+    public boolean has(ArgumentAcceptingOptionSpec<?> opt) {
+        return opts.has(opt);
+    }
+
+    public <V> V valueOf(OptionSpec<V> opt) {
+        return opts.valueOf(opt);
+    }
+
+    public <V> List<V> valuesOf(OptionSpec<V> opt) {
+        return opts.valuesOf(opt);
+    }
+
+    /**
+     * Get option value with default value picked up
+     */
+    public <V> V value(OptionSpec<V> opt) {
+        return opt.value(opts);
+    }
+
+    public void printHelp() throws IOException {
+        parser.printHelpOn(System.out);
+    }
+
+    public void printUsageAndThrowException(String message) throws Exception {
+        System.err.println(message);
+        try {
+            parser.printHelpOn(System.err);
+        } catch (IOException e) {
+            // ignore
+        }
+        throw new Exception(message);
+    }
+
+    public void printUsageAndDie(String message) throws Exception {
+        System.err.println(message);
+        try {
+            parser.printHelpOn(System.err);
+        } catch (IOException e) {
+            // ignore
+        }
+        System.exit(1);
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/cli/Boot.java b/tools/src/main/java/org/apache/kafka/cli/Boot.java
new file mode 100644
index 0000000..cf9e645
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/cli/Boot.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.cli;
+
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.cli.util.StringUtils;
+
+import java.util.Arrays;
+
+public class Boot {
+
+    public static void main(String[] args) throws Exception {
+        BootOptions options = new BootOptions(true);
+        options.parse(args);
+
+        if (options.has(options.helpOpt))
+            options.printHelp();
+        else {
+            Shell shell = new Shell();
+            shell.configureShell(options.value(options.brokerOpt));
+
+            if (options.has(options.shellOpt)) {
+                shell.run();
+            } else if (options.has(options.createOpt) || options.has(options.listOpt) || options.has(options.describeOpt) ||
+                    options.has(options.deleteOpt) || options.has(options.listOpt) || options.has(options.alterOpt)) {
+                shell.executeCommand(StringUtils.mkString(Arrays.asList(args), "", " ", ""));
+            } else {
+                options.printUsageAndDie("No available options were detected.");
+            }
+        }
+    }
+
+    /**
+     * Encapsulate all command line args management.
+     */
+    static class BootOptions extends BaseCommandOpts {
+
+        public BootOptions(boolean allowUnrecognizedOptions) {
+            super(allowUnrecognizedOptions);
+        }
+
+        public OptionSpecBuilder shellOpt = parser.accepts("shell", "Start interactive shell.");
+
+        public ArgumentAcceptingOptionSpec<String> brokerOpt = parser.accepts("broker", "One of the brokers from the cluster to send requests.")
+                .withRequiredArg()
+                .describedAs("broker")
+                .ofType(String.class)
+                .defaultsTo("127.0.0.1:9092");
+        public OptionSpecBuilder listOpt = parser.accepts("list-topics", "List all available topics.");
+        public OptionSpecBuilder createOpt = parser.accepts("create-topic", "Create a new topic.");
+        public OptionSpecBuilder deleteOpt = parser.accepts("delete-topic", "Delete a topic");
+        public OptionSpecBuilder alterOpt = parser.accepts("alter-topic", "Alter the configuration for the topic.");
+        public OptionSpecBuilder describeOpt = parser.accepts("describe-topic", "List details for the given topics.");
+        public OptionSpecBuilder helpOpt = parser.accepts("help", "Show this help.");
+    }
+
+}
diff --git a/tools/src/main/java/org/apache/kafka/cli/RequestDispatcher.java b/tools/src/main/java/org/apache/kafka/cli/RequestDispatcher.java
new file mode 100644
index 0000000..1093af7
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/cli/RequestDispatcher.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.cli;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.*;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+public class RequestDispatcher {
+
+    public static final Short NO_ERROR_ERROR_CODE = 0;
+    public static final Short INVALID_REQUEST_TARGET_ERROR_CODE = 22;
+
+    private KafkaClient client;
+    private Time time;
+    private static final Long TIMEOUT = 10000L;
+
+    private Node controller;
+    private List<Node> brokers;
+
+    public RequestDispatcher(KafkaClient client, Time time, Node broker) {
+        this.client = client;
+        this.time = time;
+        this.brokers = Arrays.asList(broker);
+    }
+
+    public AdminResponse sendAdminRequest(AdminRequest adminRequest) throws Exception {
+        if (controller == null) {
+            updateClusterMetadata();
+        }
+        ClientResponse response = this.sendRequest(adminRequest, ApiKeys.ADMIN, controller);
+        AdminResponse adminResponse = new AdminResponse(response.responseBody());
+        if (INVALID_REQUEST_TARGET_ERROR_CODE.equals(adminResponse.errorCode())) {
+            updateClusterMetadata();
+            return sendAdminRequest(adminRequest);
+        } else
+            return adminResponse;
+    }
+
+    private ClientResponse sendRequest(AbstractRequestResponse request, ApiKeys apiKeys, Node node) throws Exception {
+        // TODO is it correct to always call ready and poll
+        client.ready(node, time.milliseconds());
+        client.poll(new LinkedList<ClientRequest>(), 10, time.milliseconds());
+
+        RequestSend requestSend = new RequestSend(node.id(), client.nextRequestHeader(apiKeys), request.toStruct());
+        ClientRequest clientRequest = new ClientRequest(time.milliseconds(), true, requestSend, null);
+
+        // TODO is it possible to get response from the first try
+        client.poll(Arrays.asList(clientRequest), 10, time.milliseconds());
+
+        List<ClientResponse> responses = client.poll(new LinkedList<ClientRequest>(), TIMEOUT, time.milliseconds());
+        if (responses.isEmpty())
+            throw new TimeoutException("Didn't receive response at configured timeout");
+        ClientResponse response = responses.get(0);
+        if (!response.hasResponse())
+            throw new IllegalStateException("Received response doesn't contain response body");
+
+        return response;
+    }
+
+    public void updateClusterMetadata() throws Exception {
+        ClusterMetadataResponse clusterMetadataResponse = null;
+        for (Node broker : brokers) {
+            if (clusterMetadataResponse != null && clusterMetadataResponse.errorCode() == NO_ERROR_ERROR_CODE)
+                break;
+
+            try {
+                ClusterMetadataRequest clusterMetadataRequest = new ClusterMetadataRequest();
+                ClientResponse response = this.sendRequest(clusterMetadataRequest, ApiKeys.CLUSTER_METADATA, broker);
+                clusterMetadataResponse = new ClusterMetadataResponse(response.responseBody());
+            } catch (Exception e) {
+                // TODO write this to log
+                // printException(e);
+            }
+        }
+
+        if (clusterMetadataResponse == null)
+            throw new IllegalStateException("Cannot find controller. Cluster " + this.brokers.toString() + " is not responding");
+
+        this.brokers = clusterMetadataResponse.brokers();
+        this.controller = clusterMetadataResponse.controller();
+    }
+
+    public Node getController() {
+        return controller;
+    }
+}
\ No newline at end of file
diff --git a/tools/src/main/java/org/apache/kafka/cli/Shell.java b/tools/src/main/java/org/apache/kafka/cli/Shell.java
new file mode 100644
index 0000000..a3cbf91
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/cli/Shell.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.cli;
+
+import jline.console.ConsoleReader;
+import jline.console.completer.FileNameCompleter;
+import org.apache.kafka.cli.command.*;
+import org.apache.kafka.cli.util.StringUtils;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.producer.internals.Metadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Shell {
+
+    private ConsoleReader consoleReader;
+    private RequestDispatcher requestDispatcher;
+
+    private String currentTopic;
+    private boolean shouldExit = false;
+
+    Map<String, Command[]> commandGroups;
+    private Map<String, Command> allCommands;
+
+    public Shell(ConsoleReader consoleReader) throws IOException {
+        this.consoleReader = consoleReader;
+    }
+
+    public Shell() throws IOException {
+        this(new ConsoleReader());
+    }
+
+    private static Node parseNode(String node) {
+        String[] hostAndPort = node.split(":");
+        return new Node(0, hostAndPort[0], Integer.parseInt(hostAndPort[1]));
+    }
+
+    public void configureShell(String bootstrapBroker) throws Exception {
+        Node broker = parseNode(bootstrapBroker);
+        consoleReader.setPrompt("kafka> ");
+        consoleReader.addCompleter(new FileNameCompleter());
+
+        Time time = new SystemTime();
+
+        Metadata metadata = new Metadata();
+        metadata.update(Cluster.bootstrap(Arrays.asList(new InetSocketAddress(broker.host(), broker.port()))), time.milliseconds());
+        KafkaClient kafkaClient = new NetworkClient(new Selector(new Metrics(), time),
+                metadata, "shell", 10, 10L, 128 * 1024, 32 * 1024);
+
+        requestDispatcher = new RequestDispatcher(kafkaClient, new SystemTime(), broker);
+        requestDispatcher.updateClusterMetadata();
+
+        printLines("Requests will be sent to KafkaController@" + requestDispatcher.getController().host() + ":"
+                + requestDispatcher.getController().port() + ".");
+
+        // add your commands here
+        Command[] contextSwitchCommands = {new TopicSwitchCommand()};
+        Command[] administrationCommands = {new PrintHelpCommand(), new ClearScreenCommand(), new ExitCommand()};
+        Command[] topicCommands = {new CreateTopicCommand(), new ListTopicsCommand(), new DeleteTopicCommand(),
+                new DescribeTopicCommand(), new AlterTopicCommand()};
+
+        commandGroups = new HashMap<String, Command[]>();
+        commandGroups.put("Administration:         ", administrationCommands);
+        commandGroups.put("Switch context:         ", contextSwitchCommands);
+        commandGroups.put("Topic:                  ", topicCommands);
+
+        allCommands = new HashMap<String, Command>();
+        for (Command[] commands : commandGroups.values()) {
+            for (Command command : commands)
+                allCommands.put(command.commandName(), command);
+        }
+
+    }
+
+    public void run() throws IOException {
+        String input;
+        while (true) {
+            if (shouldExit)
+                return;
+
+            input = consoleReader.readLine();
+            if (input == null) {
+                return;
+            }
+            try {
+                executeCommand(input);
+            } catch (Exception e) {
+                printException(e);
+            }
+        }
+    }
+
+    public void executeCommand(String input) throws Exception {
+        String[] commandLineArgs = input.split(" ");
+
+        String commandCandidate = extractCommand(commandLineArgs);
+        if (commandCandidate == null) {
+            return;
+        }
+        // to allow out of shell command execution
+        if (commandCandidate.startsWith("--"))
+            commandCandidate = commandCandidate.substring(2);
+
+        Command command = allCommands.get(commandCandidate);
+        if (command == null) {
+            printLines(String.format("Command \"%s\" not found. Use command \"help\" for available commands.", commandCandidate));
+        } else {
+            if (StringUtils.contains("--help", commandLineArgs))
+                command.printHelp(this);
+            else
+                command.execute(input, this);
+        }
+    }
+
+    /**
+     * @return first non empty string from array
+     */
+    private String extractCommand(String[] args) {
+        for (String arg : args)
+            if (!arg.trim().isEmpty())
+                return arg;
+        return null;
+    }
+
+    public void clearScreen() throws IOException {
+        consoleReader.clearScreen();
+    }
+
+    public void changeTopic(String topic) {
+        currentTopic = topic;
+        setPrompt(currentTopic);
+    }
+
+    private void setPrompt(String topic) {
+        String prompt = "kafka";
+        if (topic != null)
+            prompt = prompt + " " + topic;
+        prompt = prompt + "> ";
+
+        consoleReader.setPrompt(prompt);
+    }
+
+    private void printException(Exception e) {
+        try {
+            printLines("An error occurred: " + e.toString());
+        } catch (IOException e1) {
+            // ignore
+        }
+    }
+
+    public void printHelp() throws IOException {
+        String help = "Use one of the available commands:";
+        for (Map.Entry<String, Command[]> commandGroup : commandGroups.entrySet()) {
+            help += "\n" + commandGroup.getKey();
+            for (Command cmd : commandGroup.getValue())
+                help += "\"" + cmd.commandName() + ("\" ");
+        }
+        printLines(help, "To get help for specific command use: \"<command> --help\"");
+    }
+
+    public void doExit() {
+        shouldExit = true;
+    }
+
+    public RequestDispatcher getRequestDispatcher() {
+        return requestDispatcher;
+    }
+
+    public void printLines(Collection<String> lines) throws IOException {
+        for (String line : lines) {
+            consoleReader.println(line);
+        }
+        consoleReader.flush();
+    }
+
+    public void printLines(String... lines) throws IOException {
+        printLines(Arrays.asList(lines));
+    }
+
+    public String getCurrentTopic() {
+        return currentTopic;
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/cli/command/AlterTopicCommand.java b/tools/src/main/java/org/apache/kafka/cli/command/AlterTopicCommand.java
new file mode 100644
index 0000000..ec7e40e
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/cli/command/AlterTopicCommand.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.cli.command;
+
+import joptsimple.ArgumentAcceptingOptionSpec;
+import org.apache.kafka.cli.BaseCommandOpts;
+import org.apache.kafka.cli.RequestDispatcher;
+import org.apache.kafka.cli.Shell;
+import org.apache.kafka.cli.util.StringUtils;
+import org.apache.kafka.common.requests.AdminRequest;
+import org.apache.kafka.common.requests.AdminResponse;
+
+import java.io.IOException;
+import java.util.List;
+
+public class AlterTopicCommand extends Command {
+
+    private AlterTopicCommandOpts options = new AlterTopicCommandOpts(true);
+
+    @Override
+    public void execute(String commandLine, Shell shell) throws Exception {
+        options.parse(commandLine.split(" "));
+
+        validate(options, shell.getCurrentTopic());
+
+        String topic;
+        if (options.has(options.topicOpt)) {
+            topic = options.valueOf(options.topicOpt);
+        } else {
+            topic = shell.getCurrentTopic();
+        }
+
+        AdminRequest request = generateTopicAlterRequest(topic,
+                options.valuesOf(options.configsOpt), options.valuesOf(options.deleteConfigsOpt),
+                options.valueOf(options.replicaAssignmentOpt),
+                options.valueOf(options.partitionsOpt));
+        AdminResponse response = shell.getRequestDispatcher().sendAdminRequest(request);
+        if (response.errorCode() == RequestDispatcher.NO_ERROR_ERROR_CODE) {
+            shell.printLines(String.format("Updated topic \"%s\"", topic));
+        } else {
+            shell.printLines("An error occurred while updating topic. Reason: " + response.outcome());
+        }
+    }
+
+    private void validate(AlterTopicCommandOpts options, String topic) throws Exception {
+        if (!options.has(options.topicOpt) && topic == null) {
+            options.printUsageAndThrowException("Arguments validation failed: Topic is not chosen and topic name was not defined.");
+        } else if (!options.has(options.partitionsOpt) && !options.has(options.replicaAssignmentOpt) &&
+                !options.has(options.configsOpt) && !options.has(options.deleteConfigsOpt))
+            options.printUsageAndThrowException(
+                    "Arguments validation failed: Specify at least one from - partitions, replica-assignment, config, delete-config");
+    }
+
+    @Override
+    public String commandName() {
+        return "alter-topic";
+    }
+
+    @Override
+    public void printHelp(Shell shell) throws IOException {
+        shell.printLines("Alter the configuration for the topic.");
+        options.printHelp();
+    }
+
+    // TODO draft version until we finalize Admin RQ/RP format
+    private AdminRequest generateTopicAlterRequest(String topicName, List<String> addConfigs,
+                                                   List<String> deleteConfigs,
+                                                   String replicaAssignment,
+                                                   int partitions) {
+        String args = String.format("{\"topic\" : \"%s\" ", topicName);
+        if (addConfigs != null && !addConfigs.isEmpty())
+            args += String.format(", \"addedConfigs\" : %s ", configsAsJsonArray(addConfigs));
+        if (deleteConfigs != null && !deleteConfigs.isEmpty())
+            args += String.format(", \"deletedConfigs\" : %s ", configsAsJsonArray(deleteConfigs));
+        if (replicaAssignment != null)
+            args += String.format(", \"replicaAssignment\" : \"%s\" ", replicaAssignment);
+        else
+            args += String.format(", \"partitions\" : \"%d\" ", partitions);
+        args += "}";
+        return new AdminRequest((byte) 1, (byte) 1, args);
+    }
+
+    private String configsAsJsonArray(List<String> configs) {
+        return StringUtils.mkString(configs, "[\"", "\",\"", "\"]");
+    }
+
+    class AlterTopicCommandOpts extends BaseCommandOpts {
+
+        public AlterTopicCommandOpts(boolean allowUnrecognizedOptions) {
+            super(allowUnrecognizedOptions);
+        }
+
+        public ArgumentAcceptingOptionSpec<String> topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " +
+                "expression except for --create option")
+                .withRequiredArg()
+                .describedAs("topic")
+                .ofType(String.class);
+        public ArgumentAcceptingOptionSpec<Integer> partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created " +
+                "(WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected")
+                .withRequiredArg()
+                .describedAs("# of partitions")
+                .ofType(Integer.class);
+        public ArgumentAcceptingOptionSpec<String> replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created.")
+                .withRequiredArg()
+                .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
+                        "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
+                .ofType(String.class);
+        public ArgumentAcceptingOptionSpec<String> configsOpt = parser.accepts("config", "A topic configuration override for the topic being created." +
+                "See the Kafka documentation for full details on the topic configs.")
+                .withRequiredArg()
+                .describedAs("name=value")
+                .ofType(String.class);
+
+        public ArgumentAcceptingOptionSpec<String> deleteConfigsOpt = parser.accepts("delete-config", "A topic configuration to be removed for the topic being altered." +
+                "See the Kafka documentation for full details on the topic configs.")
+                .withRequiredArg()
+                .describedAs("name=value")
+                .ofType(String.class);
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/cli/command/ClearScreenCommand.java b/tools/src/main/java/org/apache/kafka/cli/command/ClearScreenCommand.java
new file mode 100644
index 0000000..17fbc0b
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/cli/command/ClearScreenCommand.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.cli.command;
+
+import org.apache.kafka.cli.Shell;
+
+import java.io.IOException;
+
+public class ClearScreenCommand extends Command{
+    @Override
+    public void execute(String commandLine, Shell shell) throws IOException{
+        shell.clearScreen();
+    }
+
+    @Override
+    public String commandName() {
+        return "clear";
+    }
+
+    @Override
+    public void printHelp(Shell shell) throws IOException {
+        shell.printLines("Clear screen. No arguments.");
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/cli/command/Command.java b/tools/src/main/java/org/apache/kafka/cli/command/Command.java
new file mode 100644
index 0000000..624ca14
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/cli/command/Command.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.cli.command;
+
+import org.apache.kafka.cli.Shell;
+
+import java.io.IOException;
+
+public abstract class Command {
+
+    public abstract void execute(String commandLine, Shell shell) throws Exception;
+
+    public abstract String commandName();
+
+    public abstract void printHelp(Shell shell) throws IOException;
+}
diff --git a/tools/src/main/java/org/apache/kafka/cli/command/CreateTopicCommand.java b/tools/src/main/java/org/apache/kafka/cli/command/CreateTopicCommand.java
new file mode 100644
index 0000000..2776950
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/cli/command/CreateTopicCommand.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.cli.command;
+
+import joptsimple.ArgumentAcceptingOptionSpec;
+import org.apache.kafka.cli.BaseCommandOpts;
+import org.apache.kafka.cli.RequestDispatcher;
+import org.apache.kafka.cli.Shell;
+import org.apache.kafka.cli.util.StringUtils;
+import org.apache.kafka.common.requests.AdminRequest;
+import org.apache.kafka.common.requests.AdminResponse;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.util.List;
+
+public class CreateTopicCommand extends Command {
+
+    private CreateTopicCommandOpts options = new CreateTopicCommandOpts(true);
+
+    @Override
+    public void execute(String commandLine, Shell shell) throws Exception {
+        options.parse(commandLine.split(" "));
+
+        validate(options);
+
+        AdminRequest request = generateTopicCreateRequest(options.valueOf(options.topicOpt), options.valuesOf(options.configsOpt),
+                options.valueOf(options.replicaAssignmentOpt),
+                options.valueOf(options.partitionsOpt), options.valueOf(options.replicationFactorOpt));
+        AdminResponse response = shell.getRequestDispatcher().sendAdminRequest(request);
+        if (response.errorCode() == RequestDispatcher.NO_ERROR_ERROR_CODE) {
+            shell.printLines(String.format("Created topic \"%s\"", options.valueOf(options.topicOpt)));
+        } else {
+            shell.printLines("An error occurred while creating topic. Reason: " + response.outcome());
+        }
+    }
+
+    private void validate(CreateTopicCommandOpts options) throws Exception {
+        if (!options.has(options.topicOpt)) {
+            options.printUsageAndThrowException("Arguments validation failed: Topic name was not defined.");
+        } else if (!(options.has(options.partitionsOpt) && options.has(options.replicationFactorOpt)) &&
+                !options.has(options.replicaAssignmentOpt))
+            options.printUsageAndThrowException(
+                    "Arguments validation failed: Either (partitions and replication-factor) pair or replica-assignment must be specified");
+    }
+
+    @Override
+    public String commandName() {
+        return "create-topic";
+    }
+
+    @Override
+    public void printHelp(Shell shell) throws IOException {
+        shell.printLines("List details for the given topic.");
+        options.printHelp();
+    }
+
+    // TODO draft version until we finalize Admin RQ/RP format
+    private AdminRequest generateTopicCreateRequest(String topicName, List<String> configs,
+                                                    String replicaAssignment,
+                                                    int replicas, int partitions) {
+        String args = String.format("{\"topic\" : \"%s\", ", topicName);
+        if (configs != null && !configs.isEmpty())
+            args += String.format("\"configs\" : %s, ", configsAsJsonArray(configs));
+        if (replicaAssignment != null)
+            args += String.format("\"replicaAssignment\" : \"%s\"}", replicaAssignment);
+        else
+            args += String.format("\"partitions\" : \"%d\", " +
+                    "\"replicas\" : \"%d\"}", replicas, partitions);
+        return new AdminRequest((byte) 1, (byte) 0, args);
+    }
+
+    private String configsAsJsonArray(List<String> configs) {
+        return StringUtils.mkString(configs, "[\"", "\",\"", "\"]");
+    }
+
+    class CreateTopicCommandOpts extends BaseCommandOpts {
+
+        public CreateTopicCommandOpts(boolean allowUnrecognizedOptions) {
+            super(allowUnrecognizedOptions);
+        }
+
+        public ArgumentAcceptingOptionSpec<String> topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " +
+                "expression except for --create option")
+                .withRequiredArg()
+                .describedAs("topic")
+                .ofType(String.class);
+        public ArgumentAcceptingOptionSpec<Integer> partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created " +
+                "(WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected")
+                .withRequiredArg()
+                .describedAs("# of partitions")
+                .ofType(Integer.class);
+        public ArgumentAcceptingOptionSpec<Integer> replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created.")
+                .withRequiredArg()
+                .describedAs("replication factor")
+                .ofType(Integer.class);
+        public ArgumentAcceptingOptionSpec<String> replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created.")
+                .withRequiredArg()
+                .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
+                        "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
+                .ofType(String.class);
+
+        public ArgumentAcceptingOptionSpec<String> configsOpt = parser.accepts("config", "A topic configuration override for the topic being created." +
+                "See the Kafka documentation for full details on the topic configs.")
+                .withRequiredArg()
+                .describedAs("name=value")
+                .ofType(String.class);
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/cli/command/DeleteTopicCommand.java b/tools/src/main/java/org/apache/kafka/cli/command/DeleteTopicCommand.java
new file mode 100644
index 0000000..fc94464
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/cli/command/DeleteTopicCommand.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.cli.command;
+
+
+import joptsimple.ArgumentAcceptingOptionSpec;
+import org.apache.kafka.cli.BaseCommandOpts;
+import org.apache.kafka.cli.Shell;
+import org.apache.kafka.common.requests.AdminRequest;
+import org.apache.kafka.common.requests.AdminResponse;
+
+import java.io.IOException;
+
+public class DeleteTopicCommand extends Command {
+
+    DeleteTopicCommandOpts options = new DeleteTopicCommandOpts(true);
+
+    @Override
+    public void execute(String commandLine, Shell shell) throws Exception {
+        options.parse(commandLine.split(" "));
+
+        validate(options, shell.getCurrentTopic());
+
+        String topic;
+        if (options.has(options.topicOpt)) {
+            topic = options.valueOf(options.topicOpt);
+        } else {
+            topic = shell.getCurrentTopic();
+        }
+        AdminRequest request = generateTopicDeleteRequest(topic);
+
+        AdminResponse response = shell.getRequestDispatcher().sendAdminRequest(request);
+        if (response.errorCode() == 0) {
+            shell.printLines(String.format("Deleted topic \"%s\"", topic));
+            if (options.valueOf(options.topicOpt).equals(shell.getCurrentTopic()))
+                shell.changeTopic(null);
+        } else {
+            shell.printLines("An error occurred while deleting topic. Reason: " + response.outcome());
+        }
+    }
+
+    // TODO draft version until we finalize Admin RQ/RP format
+    private AdminRequest generateTopicDeleteRequest(String topicName) {
+        return new AdminRequest((byte) 1, (byte) 2,
+                String.format("{\"topic\" : \"%s\"}", topicName));
+    }
+
+    private void validate(DeleteTopicCommandOpts options, String topic) throws Exception {
+        if (!options.has(options.topicOpt) && topic == null)
+            options.printUsageAndThrowException("Both topic option and context were not defined.");
+    }
+
+    @Override
+    public String commandName() {
+        return "delete-topic";
+    }
+
+    @Override
+    public void printHelp(Shell shell) throws IOException {
+        shell.printLines("Create a new topic.");
+        options.printHelp();
+    }
+
+    class DeleteTopicCommandOpts extends BaseCommandOpts {
+
+        public DeleteTopicCommandOpts(boolean allowUnrecognizedOptions) {
+            super(allowUnrecognizedOptions);
+        }
+
+        public ArgumentAcceptingOptionSpec<String> topicOpt = parser.accepts("topic", "The topic to be deleted.")
+                .withRequiredArg()
+                .describedAs("topic")
+                .ofType(String.class);
+    }
+}
+
diff --git a/tools/src/main/java/org/apache/kafka/cli/command/DescribeTopicCommand.java b/tools/src/main/java/org/apache/kafka/cli/command/DescribeTopicCommand.java
new file mode 100644
index 0000000..72aeb6a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/cli/command/DescribeTopicCommand.java
@@ -0,0 +1,83 @@
+package org.apache.kafka.cli.command;
+
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.cli.BaseCommandOpts;
+import org.apache.kafka.cli.Shell;
+import org.apache.kafka.common.requests.AdminRequest;
+import org.apache.kafka.common.requests.AdminResponse;
+
+import java.io.IOException;
+
+public class DescribeTopicCommand extends Command {
+    DescribeTopicCommandOpts options = new DescribeTopicCommandOpts(true);
+
+    @Override
+    public void execute(String commandLine, Shell shell) throws Exception {
+        options.parse(commandLine.split(" "));
+
+        validate(options, shell.getCurrentTopic());
+
+        String topic;
+        if (options.has(options.topicOpt)) {
+            topic = options.valueOf(options.topicOpt);
+        } else {
+            topic = shell.getCurrentTopic();
+        }
+        AdminRequest request = generateTopicDescribeRequest(topic, options.has(options.reportUnderReplicatedPartitionsOpt),
+                options.has(options.reportUnavailablePartitionsOpt), options.has(options.topicsWithOverridesOpt));
+
+        AdminResponse response = shell.getRequestDispatcher().sendAdminRequest(request);
+        if (response.errorCode() == 0) {
+            shell.printLines("Topic description:", response.outcome());
+        } else {
+            shell.printLines("An error occurred while getting topic description. Reason: " + response.outcome());
+        }
+    }
+
+    private void validate(DescribeTopicCommandOpts options, String topic) throws Exception {
+        if (!options.has(options.topicOpt) && topic == null)
+            options.printUsageAndThrowException("Both topic option and context were not defined.");
+    }
+
+    // TODO draft version until we finalize Admin RQ/RP format
+    private AdminRequest generateTopicDescribeRequest(String topicName, boolean reportUnderReplicatedPartitions,
+                                                      boolean reportUnavailablePartitions, boolean reportOverriddenConfigs) {
+        return new AdminRequest((byte) 1, (byte) 4,
+                String.format("{\"topic\" : \"%s\", \"reportUnderReplicatedPartitions\" : \"%s\", " +
+                                "\"reportUnavailablePartitions\" : \"%s\" ," +
+                                "\"reportOverriddenConfigs\" : \"%s\"}", topicName, String.valueOf(reportUnderReplicatedPartitions),
+                        String.valueOf(reportUnavailablePartitions), String.valueOf(reportOverriddenConfigs)));
+    }
+
+
+    @Override
+    public String commandName() {
+        return "describe-topic";
+    }
+
+    @Override
+    public void printHelp(Shell shell) throws IOException {
+        shell.printLines("Delete a topic.");
+        options.printHelp();
+    }
+
+    class DescribeTopicCommandOpts extends BaseCommandOpts {
+
+        public DescribeTopicCommandOpts(boolean allowUnrecognizedOptions) {
+            super(allowUnrecognizedOptions);
+        }
+
+        public ArgumentAcceptingOptionSpec<String> topicOpt = parser.accepts("topic", "The topic to be described.")
+                .withRequiredArg()
+                .describedAs("topic")
+                .ofType(String.class);
+        public OptionSpecBuilder reportUnderReplicatedPartitionsOpt = parser.accepts("under-replicated-partitions",
+                "if set when describing topics, only show under replicated partitions");
+        public OptionSpecBuilder reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions",
+                "if set when describing topics, only show partitions whose leader is not available");
+
+        public OptionSpecBuilder topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
+                "if set when describing topics, only show topics that have overridden configs");
+    }
+}
\ No newline at end of file
diff --git a/tools/src/main/java/org/apache/kafka/cli/command/ExitCommand.java b/tools/src/main/java/org/apache/kafka/cli/command/ExitCommand.java
new file mode 100644
index 0000000..5daa1c9
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/cli/command/ExitCommand.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.cli.command;
+
+import org.apache.kafka.cli.Shell;
+
+import java.io.IOException;
+
+public class ExitCommand extends Command {
+    @Override
+    public void execute(String commandLine, Shell shell) throws Exception {
+        shell.doExit();
+    }
+
+    @Override
+    public String commandName() {
+        return "exit";
+    }
+
+    @Override
+    public void printHelp(Shell shell) throws IOException {
+        shell.printLines("Exit this shell. No arguments.");
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/cli/command/ListTopicsCommand.java b/tools/src/main/java/org/apache/kafka/cli/command/ListTopicsCommand.java
new file mode 100644
index 0000000..71181de
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/cli/command/ListTopicsCommand.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.cli.command;
+
+import joptsimple.ArgumentAcceptingOptionSpec;
+import org.apache.kafka.cli.BaseCommandOpts;
+import org.apache.kafka.cli.Shell;
+import org.apache.kafka.common.requests.AdminRequest;
+import org.apache.kafka.common.requests.AdminResponse;
+
+import java.io.IOException;
+
+public class ListTopicsCommand extends Command {
+    ListTopicsCommandOpts options = new ListTopicsCommandOpts(true);
+
+    @Override
+    public void execute(String commandLine, Shell shell) throws Exception {
+        options.parse(commandLine.split(" "));
+
+        AdminRequest request =
+                generateTopicsListRequest(options.has(options.topicOpt) ? options.valueOf(options.topicOpt) : null);
+
+        AdminResponse response = shell.getRequestDispatcher().sendAdminRequest(request);
+        if (response.errorCode() == 0) {
+            // TODO for now simple to string until we finalize RQ/RP format
+            shell.printLines("(Topics marked for deletion), (Ordinary topics):", response.outcome());
+        } else {
+            shell.printLines("An error occurred while listing topics. Reason: " + response.outcome());
+        }
+    }
+
+    // TODO draft version until we finalize Admin RQ/RP format
+    private AdminRequest generateTopicsListRequest(String topicName) {
+        String args;
+        if (topicName != null) {
+            args = String.format("{\"topic\" : \"%s\"}", topicName);
+        } else
+            args = "{}";
+
+        return new AdminRequest((byte) 1, (byte) 3, args);
+    }
+
+    @Override
+    public String commandName() {
+        return "list-topics";
+    }
+
+    @Override
+    public void printHelp(Shell shell) throws IOException {
+        shell.printLines("List all available topics.");
+        options.printHelp();
+    }
+
+    class ListTopicsCommandOpts extends BaseCommandOpts {
+
+        public ListTopicsCommandOpts(boolean allowUnrecognizedOptions) {
+            super(allowUnrecognizedOptions);
+        }
+
+        public ArgumentAcceptingOptionSpec<String> topicOpt = parser.accepts("topic", "The topic to be checked for existence.")
+                .withRequiredArg()
+                .describedAs("topic")
+                .ofType(String.class);
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/cli/command/PrintHelpCommand.java b/tools/src/main/java/org/apache/kafka/cli/command/PrintHelpCommand.java
new file mode 100644
index 0000000..f588d37
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/cli/command/PrintHelpCommand.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.cli.command;
+
+import org.apache.kafka.cli.Shell;
+
+import java.io.IOException;
+
+public class PrintHelpCommand extends Command{
+    @Override
+    public void execute(String commandLine, Shell shell) throws Exception {
+        shell.printHelp();
+    }
+
+    @Override
+    public String commandName() {
+        return "help";
+    }
+
+    @Override
+    public void printHelp(Shell shell) throws IOException {
+        shell.printLines("Prints all available commands.");
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/cli/command/TopicSwitchCommand.java b/tools/src/main/java/org/apache/kafka/cli/command/TopicSwitchCommand.java
new file mode 100644
index 0000000..d5e0409
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/cli/command/TopicSwitchCommand.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.cli.command;
+
+import org.apache.kafka.cli.Shell;
+import org.apache.kafka.common.requests.AdminRequest;
+import org.apache.kafka.common.requests.AdminResponse;
+
+import java.io.IOException;
+
+public class TopicSwitchCommand extends Command {
+    @Override
+    public void execute(String commandLine, Shell shell) throws Exception {
+        String[] args = commandLine.split(" ");
+        String topic = args.length == 2 && !args[1].isEmpty() ? args[1] : null;
+
+        if (topic == null) {
+            shell.changeTopic(null);
+        } else {
+            //TODO doesn't exist handling and print if no such topic exists
+            AdminRequest adminRequest = new AdminRequest((byte) 1, (byte) 3,
+                    String.format("{\"topic\" : \"%s\"}", topic));
+            AdminResponse response = shell.getRequestDispatcher().sendAdminRequest(adminRequest);
+            if (response.errorCode() == 0 && response.outcome().contains(topic)) {
+                shell.changeTopic(topic);
+                shell.printLines("Switched to \"" + topic + "\" successfully");
+            } else {
+                shell.printLines("Could not switch to topic \"" + topic + "\". Reason: " + response.outcome());
+            }
+        }
+
+    }
+
+    @Override
+    public String commandName() {
+        return "topic";
+    }
+
+    @Override
+    public void printHelp(Shell shell) throws IOException {
+        shell.printLines("Switches context to given topic so all topic commands will be applied to switched topic.",
+                "Sample usage: \"topic <your_topic>\". To switch off the topic context use just: \"topic\".");
+    }
+
+}
diff --git a/tools/src/main/java/org/apache/kafka/cli/util/StringUtils.java b/tools/src/main/java/org/apache/kafka/cli/util/StringUtils.java
new file mode 100644
index 0000000..c0a6aec
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/cli/util/StringUtils.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.cli.util;
+
+
+import java.util.Collection;
+
+public class StringUtils {
+
+    public static String mkString(Collection<String> values, String start, String sep, String end) {
+        if (values.size() == 0)
+            return "";
+
+        StringBuilder result = new StringBuilder();
+        result.append(start);
+        int i = 0;
+        for (String val : values) {
+            if (i > 0)
+                result.append(sep);
+            result.append(val);
+            i++;
+        }
+        result.append(end);
+
+        return result.toString();
+    }
+
+    public static boolean contains(String arg, String... list) {
+        for (String elem : list) {
+            if (elem.equals(arg))
+                return true;
+        }
+
+        return false;
+    }
+}
-- 
1.9.1

