diff --git a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
deleted file mode 100644
index 3da4518..0000000
--- a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import joptsimple.OptionParser
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
-
-object DeleteTopicCommand {
-
-  def main(args: Array[String]): Unit = {
-    val parser = new OptionParser
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be deleted.")
-                         .withRequiredArg
-                         .describedAs("topic")
-                         .ofType(classOf[String])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
-                                      "Multiple URLS can be given to allow fail-over.")
-                           .withRequiredArg
-                           .describedAs("urls")
-                           .ofType(classOf[String])
-
-    val options = parser.parse(args : _*)
-
-    for(arg <- List(topicOpt, zkConnectOpt)) {
-      if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-
-    val topic = options.valueOf(topicOpt)
-    val zkConnect = options.valueOf(zkConnectOpt)
-    var zkClient: ZkClient = null
-    try {
-      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-      zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
-      println("deletion succeeded!")
-    }
-    catch {
-      case e =>
-        println("delection failed because of " + e.getMessage)
-        println(Utils.stackTrace(e))
-    }
-    finally {
-      if (zkClient != null)
-        zkClient.close()
-    }
-  }
-}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/admin/DeleteTopicsCommand.scala b/core/src/main/scala/kafka/admin/DeleteTopicsCommand.scala
new file mode 100644
index 0000000..2175975
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/DeleteTopicsCommand.scala
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import joptsimple.OptionParser
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils._
+import kafka.common.AdminCommandFailedException
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+
+object DeleteTopicsCommand extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    val parser = new OptionParser
+    val topicsOpt = parser.accepts("topics", "REQUIRED: Comma separated list of the topics to be deleted.")
+                           .withRequiredArg
+                           .describedAs("topics")
+                           .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+                                      "Multiple URLS can be given to allow fail-over.")
+                               .withRequiredArg
+                               .describedAs("urls")
+                               .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+
+    for(arg <- List(topicsOpt, zkConnectOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val topics = options.valueOf(topicsOpt).split(",").toSeq
+    val zkConnect = options.valueOf(zkConnectOpt)
+    var zkClient: ZkClient = null
+    try {
+      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+      val existingTopics = ZkUtils.getAllTopics(zkClient)
+      for (topic <- topics) {
+        if (!existingTopics.contains(topic)) {
+          println("Topic " + topic + " does not exist")
+          exit(1)
+        }
+      }
+      val deleteTopicsCommand = new DeleteTopicsCommand(zkClient, topics)
+      deleteTopicsCommand.deleteTopics()
+    } catch {
+      case e =>
+        println("Deletion failed because of " + e.getMessage)
+        println(Utils.stackTrace(e))
+    }
+    finally {
+      if (zkClient != null)
+        zkClient.close()
+    }
+  }
+
+  def parseDeleteTopicsData(data: String): Set[String] = {
+    Json.parseFull(data) match {
+      case Some(m) => m.asInstanceOf[Map[String, Any]].get("topics") match {
+        case Some(topics) => topics.asInstanceOf[List[String]].toSet
+        case None => throw new AdministrationException("Delete topics data is empty or malformed")
+      }
+      case None => throw new AdministrationException("Delete topics data is empty or malformed")
+    }
+  }
+
+  def writeDeleteTopicsDataToZk(zkClient: ZkClient, topics: Seq[String]) {
+    val deleteTopicsZkPath = ZkUtils.deleteTopicsPath
+    val jsonTopicsData = Utils.seqToJson(topics, valueInQuotes = true)
+    val jsonData = Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString), valueInQuotes = false) ++
+                                         Utils.mapToJsonFields(Map("topics" -> jsonTopicsData), valueInQuotes = false))
+    try {
+      ZkUtils.createPersistentPath(zkClient, deleteTopicsZkPath, jsonData)
+      info("Created delete topic path with %s".format(jsonData))
+    } catch {
+      case nee: ZkNodeExistsException =>
+        val topicBeingDeleted = parseDeleteTopicsData(ZkUtils.readData(zkClient, deleteTopicsZkPath)._1)
+        throw new AdministrationException("Topics " + topics.mkString(",") + " are currently being deleted")
+      case e => throw new AdministrationException(e.toString)
+    }
+  }
+}
+
+class DeleteTopicsCommand(val zkClient: ZkClient, val topics: Seq[String]) extends Logging {
+
+  def deleteTopics() {
+    try {
+      DeleteTopicsCommand.writeDeleteTopicsDataToZk(zkClient, topics)
+    } catch {
+      case e => throw new AdminCommandFailedException("Admin command failed", e)
+    }
+  }
+}
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 74614d8..9547f02 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -21,7 +21,7 @@ import collection.immutable.Set
 import com.yammer.metrics.core.Gauge
 import java.lang.{IllegalStateException, Object}
 import java.util.concurrent.TimeUnit
-import kafka.admin.PreferredReplicaLeaderElectionCommand
+import kafka.admin.{DeleteTopicsCommand, PreferredReplicaLeaderElectionCommand}
 import kafka.api._
 import kafka.cluster.Broker
 import kafka.common._
@@ -50,7 +50,8 @@ class ControllerContext(val zkClient: ZkClient,
                         var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] =
                           new mutable.HashMap,
                         var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] =
-                          new mutable.HashSet) {
+                          new mutable.HashSet,
+                        var topicsBeingDeleted: mutable.Set[String] = new mutable.HashSet) {
 
   private var liveBrokersUnderlying: Set[Broker] = Set.empty
   private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
@@ -235,6 +236,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
       registerReassignedPartitionsListener()
       registerPreferredReplicaElectionListener()
+      registerDeleteTopicsListener()
       partitionStateMachine.registerListeners()
       replicaStateMachine.registerListeners()
       initializeControllerContext()
@@ -244,6 +246,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
       initializeAndMaybeTriggerPartitionReassignment()
       initializeAndMaybeTriggerPreferredReplicaElection()
+      initializeAndMaybeTriggerTopicDeletion()
     }
     else
       info("Controller has been shut down, aborting startup/failover")
@@ -312,11 +315,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
     val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
       deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader)).keySet
+    // handle dead replicas
+    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), OfflineReplica)
     partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
     // trigger OnlinePartition state changes for offline or new partitions
     partitionStateMachine.triggerOnlinePartitionStateChange()
-    // handle dead replicas
-    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), OfflineReplica)
   }
 
   /**
@@ -401,6 +404,32 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     }
   }
 
+  def onTopicsDeletion(topics: Set[String]) {
+    info("Starting deletion of topics %s".format(topics.mkString(",")))
+    try {
+      controllerContext.topicsBeingDeleted ++= topics
+      val partitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, topics.toSeq)
+      val partitions: mutable.Set[TopicAndPartition] = new mutable.HashSet
+      for ((topic, replicaAssignment) <- partitionAssignment) {
+        for ((partition, replicas) <- replicaAssignment) {
+          partitions.add(new TopicAndPartition(topic, partition))
+        }
+      }
+      // Let the individual actions below adjust zk data
+      val brokers = ZkUtils.getAllBrokersInCluster(zkClient)
+      val replicasToStop = ZkUtils.getAllReplicasOnBroker(zkClient, topics.toSeq, brokers.map(_.id))
+      replicaStateMachine.handleStateChanges(replicasToStop, OfflineReplica)
+      partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
+      replicaStateMachine.handleStateChanges(replicasToStop, NonExistentReplica)
+      partitionStateMachine.handleStateChanges(partitions, NonExistentPartition)
+      topics.foreach(topic => ZkUtils.deletePathRecursive(zkClient, ZkUtils.getTopicPath(topic)))
+    } catch {
+      case e => error("Error completing deletion of topics " + topics.mkString(","), e)
+    } finally {
+      removeFromTopicsBeingDeleted(topics)
+    }
+  }
+
   /**
    * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
    * is the controller. It merely registers the session expiration listener and starts the controller leader
@@ -516,6 +545,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     onPreferredReplicaElection(controllerContext.partitionsUndergoingPreferredReplicaElection.toSet)
   }
 
+  private def initializeAndMaybeTriggerTopicDeletion() {
+    // Read the topics being deleted from zookeeper path /admin/delete_topics
+    val topicsBeingDeleted = ZkUtils.getTopicsBeingDeleted(zkClient)
+    // Check if they are already deleted
+    val topicsThatWereDeleted = topicsBeingDeleted.filter(!controllerContext.allTopics.contains(_))
+    controllerContext.topicsBeingDeleted ++= topicsBeingDeleted
+    controllerContext.topicsBeingDeleted --= topicsThatWereDeleted
+    info("Topics being deleted: %s".format(topicsBeingDeleted.mkString(",")))
+    info("Topics that were already deleted: %s".format(topicsThatWereDeleted.mkString(",")))
+    info("Resuming deletion for topics: %s".format(controllerContext.topicsBeingDeleted.mkString(",")))
+    onTopicsDeletion(controllerContext.topicsBeingDeleted.toSet)
+  }
+
   private def startChannelManager() {
     controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config)
     controllerContext.controllerChannelManager.startup()
@@ -620,6 +662,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, new PreferredReplicaElectionListener(this))
   }
 
+  private def registerDeleteTopicsListener() {
+    zkClient.subscribeDataChanges(ZkUtils.deleteTopicsPath, new DeleteTopicsListener(this))
+  }
+
   private def registerControllerChangedListener() {
     zkClient.subscribeDataChanges(ZkUtils.ControllerEpochPath, new ControllerEpochListener(this))
   }
@@ -663,6 +709,18 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
   }
 
+  def removeFromTopicsBeingDeleted(topicsToBeRemoved: Set[String]) {
+    for (topic <- topicsToBeRemoved) {
+      if (ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPath(topic))._1 == None) {
+        info("Topic %s was deleted successfully".format(topic))
+      } else {
+        warn("Topic %s could not be deleted".format(topic))
+      }
+    }
+    ZkUtils.deletePath(zkClient, ZkUtils.deleteTopicsPath)
+    controllerContext.topicsBeingDeleted --= topicsToBeRemoved
+  }
+
   private def getAllReplicasForPartition(partitions: Set[TopicAndPartition]): Set[PartitionAndReplica] = {
     partitions.map { p =>
       val replicas = controllerContext.partitionReplicaAssignment(p)
@@ -932,8 +990,39 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD
   }
 
   /**
-   * @throws Exception
-   *             On any error.
+   * @throws Exception on any error.
+   */
+  @throws(classOf[Exception])
+  def handleDataDeleted(dataPath: String) {
+  }
+}
+
+/**
+ * Starts the deletion of topics specified under /admin/delete_topics
+ */
+class DeleteTopicsListener(controller: KafkaController) extends IZkDataListener with Logging {
+  this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]:"
+  val controllerContext = controller.controllerContext
+  val zkClient = controllerContext.zkClient
+
+  /**
+   * Invoked when some topics are specified for deletion by the admin command
+   * @throws Exception on any error
+   */
+  @throws(classOf[Exception])
+  def handleDataChange(dataPath: String, data: Object) {
+    debug("Delete topics listener fired for path %s. Record topics to undergo deletion %s".format(dataPath, data.toString))
+    val topicsToBeDeleted = DeleteTopicsCommand.parseDeleteTopicsData(data.toString).toSet
+
+    controllerContext.controllerLock synchronized {
+      info("These topics are already being deleted: %s".format(controllerContext.topicsBeingDeleted.mkString(",")))
+      val newTopicsToBeDeleted = topicsToBeDeleted.filter(t => !controllerContext.topicsBeingDeleted.contains(t))
+      controller.onTopicsDeletion(newTopicsToBeDeleted)
+    }
+  }
+
+  /**
+   * @throws Exception on any error.
    */
   @throws(classOf[Exception])
   def handleDataDeleted(dataPath: String) {
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 156bb10..deb7b3a 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -171,7 +171,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
           stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from Offline to NotExists"
                                     .format(controllerId, controller.epoch, topicAndPartition))
-          partitionState.put(topicAndPartition, NonExistentPartition)
+          deletePartition(topicAndPartition)
           // post: partition state is deleted from all brokers and zookeeper
       }
     } catch {
@@ -181,6 +181,14 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     }
   }
 
+  private def deletePartition(topicAndPartition: TopicAndPartition) {
+    partitionState.remove(topicAndPartition)
+    controllerContext.partitionLeadershipInfo.remove(topicAndPartition)
+    controllerContext.partitionReplicaAssignment.remove(topicAndPartition)
+    controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
+    controllerContext.partitionsUndergoingPreferredReplicaElection.remove(topicAndPartition)
+  }
+
   /**
    * Invoked on startup of the partition's state machine to set the initial state for all existing partitions in
    * zookeeper
@@ -363,13 +371,14 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             val currentChildren = JavaConversions.asBuffer(children).toSet
             val newTopics = currentChildren -- controllerContext.allTopics
             val deletedTopics = controllerContext.allTopics -- currentChildren
-            //        val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1))
             controllerContext.allTopics = currentChildren
 
             val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
             controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
               !deletedTopics.contains(p._1.topic))
             controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
+
+            // TODO: Remove all of above and rename this to TopicAdditionListener
             info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
               deletedTopics, addedPartitionReplicaAssignment))
             if(newTopics.size > 0)
@@ -377,7 +386,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           } catch {
             case e => error("Error while handling new topic", e )
           }
-          // TODO: kafka-330  Handle deleted topics
         }
       }
     }
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index ef2356f..211c356 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -126,16 +126,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           replicaState.put((topic, partition, replicaId), NewReplica)
           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NewReplica"
                                     .format(controllerId, controller.epoch, replicaId, topicAndPartition))
-        case NonExistentReplica =>
-          assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
-          // send stop replica command
-          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true)
-          // remove this replica from the assigned replicas list for its partition
-          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-          controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
-          replicaState.remove((topic, partition, replicaId))
-          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NonExistentReplica"
-                                    .format(controllerId, controller.epoch, replicaId, topicAndPartition))
         case OnlineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState)
           replicaState((topic, partition, replicaId)) match {
@@ -152,7 +142,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                 case Some(leaderIsrAndControllerEpoch) =>
                   brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
                     replicaAssignment.size)
-                  replicaState.put((topic, partition, replicaId), OnlineReplica)
                   stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica"
                     .format(controllerId, controller.epoch, replicaId, topicAndPartition))
                 case None => // that means the partition was never in OnlinePartition state, this means the broker never
@@ -172,9 +161,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                     case Some(updatedLeaderIsrAndControllerEpoch) =>
                       // send the shrunk ISR state change request only to the leader
                       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
-                        topic, partition, updatedLeaderIsrAndControllerEpoch,
-                        replicaAssignment.size)
-                      replicaState.put((topic, partition, replicaId), OfflineReplica)
+                                                                          topic, partition, updatedLeaderIsrAndControllerEpoch,
+                                                                          replicaAssignment.size)
                       stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica"
                                                 .format(controllerId, controller.epoch, replicaId, topicAndPartition))
                       false
@@ -185,10 +173,17 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
               case None =>
                 true
             }
-          if (leaderAndIsrIsEmpty)
-            throw new StateChangeFailedException(
-              "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
-              .format(replicaId, topicAndPartition))
+          replicaState.put((topic, partition, replicaId), OfflineReplica)
+        case NonExistentReplica =>
+          assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
+          // send stop replica command
+          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true)
+          // remove this replica from the assigned replicas list for its partition
+          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
+          controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
+          replicaState.remove((topic, partition, replicaId))
+          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NonExistentReplica"
+                                    .format(controllerId, controller.epoch, replicaId, topicAndPartition))
       }
     }
     catch {
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 7d71451..dc437f4 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -126,7 +126,10 @@ private[kafka] class Log(val dir: File,
     
   /* Calculate the offset of the next message */
   private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset())
-  
+
+  val topic = name.substring(0, name.lastIndexOf("-"))
+  val partition = name.substring(name.lastIndexOf("-") + 1, name.length).toInt
+
   debug("Completed load of log %s with log end offset %d".format(name, logEndOffset))
 
   newGauge(name + "-" + "NumLogSegments",
@@ -287,7 +290,7 @@ private[kafka] class Log(val dir: File,
               }
               val assignedLastOffset = offsetCounter.get - 1
               val numMessages = assignedLastOffset - firstOffset + 1
-              BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(numMessages)
+              BrokerTopicStats.getBrokerTopicStats(topic).messagesInRate.mark(numMessages)
               BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numMessages)
               assignedLastOffset
             } else {
@@ -653,9 +656,6 @@ private[kafka] class Log(val dir: File,
     }
   }
 
-  def topicName():String = {
-    name.substring(0, name.lastIndexOf("-"))
-  }
 
   def getLastFlushedTime():Long = {
     return lastflushedTime.get
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 497cfdd..6671249 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -281,6 +281,18 @@ private[kafka] class LogManager(val config: KafkaConfig,
   }
 
   /**
+   * Delete the log of a topic-partition.
+   */
+  def deleteLogs(topicAndPartitions: TopicAndPartition*) {
+    for (topicAndPartition <- topicAndPartitions) {
+      val log = logs.get(topicAndPartition)
+      log.markDeletedWhile(_ => true)
+      log.delete()
+      logs.remove(topicAndPartition)
+    }
+  }
+
+  /**
    * Close all the logs
    */
   def shutdown() {
@@ -311,15 +323,15 @@ private[kafka] class LogManager(val config: KafkaConfig,
       try {
         val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
         var logFlushInterval = config.logFlushIntervalMs
-        if(logFlushIntervals.contains(log.topicName))
-          logFlushInterval = logFlushIntervals(log.topicName)
-        debug(log.topicName + " flush interval  " + logFlushInterval +
+        if(logFlushIntervals.contains(log.topic))
+          logFlushInterval = logFlushIntervals(log.topic)
+        debug(log.topic + " flush interval  " + logFlushInterval +
                       " last flushed " + log.getLastFlushedTime + " time since last flush: " + timeSinceLastFlush)
         if(timeSinceLastFlush >= logFlushInterval)
           log.flush
       } catch {
         case e =>
-          error("Error flushing topic " + log.topicName, e)
+          error("Error flushing topic " + log.topic, e)
           e match {
             case _: IOException =>
               fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 68e712c..bd42d82 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -114,13 +114,13 @@ class ReplicaManager(val config: KafkaConfig,
     getReplica(topic, partitionId) match {
       case Some(replica) =>
         replicaFetcherManager.removeFetcher(topic, partitionId)
-        /* TODO: handle deleteLog in a better way */
-        //if (deletePartition)
-        //  logManager.deleteLog(topic, partition)
         leaderPartitionsLock synchronized {
           leaderPartitions -= replica.partition
         }
         allPartitions.remove((topic, partitionId))
+        if (deletePartition) {
+          logManager.deleteLogs(TopicAndPartition(topic, partitionId))
+        }
         info("After removing partition [%s,%d], the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions))
       case None => //do nothing if replica no longer exists
     }
@@ -239,6 +239,12 @@ class ReplicaManager(val config: KafkaConfig,
       // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
       // have been completely populated before starting the checkpointing there by avoiding weird race conditions
       if (!hwThreadInitialized) {
+        // If the controller did not instruct this broker to initialize replicas for certain partitions in the first leaderAndIsr request,
+        // delete the logs of those partitions.
+        val partitionsToBeDeleted = logManager.allLogs.map(log => (log.topic, log.partition)).filter(p => !allPartitions.contains(p))
+        val logsToBeDeleted = partitionsToBeDeleted.map(p => TopicAndPartition(p._1, p._2)).toSeq
+        logManager.deleteLogs(logsToBeDeleted:_*)
+        info("Logs of these partitions were deleted: %s".format(partitionsToBeDeleted.mkString(",")))
         startHighWaterMarksCheckPointThread()
         hwThreadInitialized = true
       }
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index c639efb..b20806b 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -476,7 +476,7 @@ object Utils extends Logging {
   }
 
  /**
-   * Format a Map[String, String] as JSON object.
+   * Format a Map[String, String] as a sequence of JSON fields.
    */
   def mapToJsonFields(jsonDataMap: Map[String, String], valueInQuotes: Boolean): Seq[String] = {
     val jsonFields: mutable.ListBuffer[String] = ListBuffer()
@@ -507,7 +507,7 @@ object Utils extends Logging {
     val builder = new StringBuilder
     builder.append("[ ")
     if (valueInQuotes)
-      builder.append(jsonData.map("\"" + _ + "\"")).mkString(", ")
+      builder.append(jsonData.map("\"" + _ + "\"").mkString(", "))
     else
       builder.append(jsonData.mkString(", "))
     builder.append(" ]")
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ce1904b..5c7b2cf 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -43,6 +43,7 @@ object ZkUtils extends Logging {
   val ControllerEpochPath = "/controller_epoch"
   val ReassignPartitionsPath = "/admin/reassign_partitions"
   val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
+  val deleteTopicsPath = "/admin/delete_topics"
 
   def getTopicPath(topic: String): String ={
     BrokerTopicsPath + "/" + topic
@@ -420,7 +421,8 @@ object ZkUtils extends Logging {
                       } catch {
                         case e: ZkNoNodeException =>
                           (None, stat)
-                        case e2 => throw e2
+                        case e2 =>
+                          throw e2
                       }
     dataAndStat
   }
@@ -638,11 +640,12 @@ object ZkUtils extends Logging {
     }
   }
 
-  def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
-    val brokerIdPath = BrokerIdsPath + "/" + brokerId
-    zkClient.delete(brokerIdPath)
-    val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
-    zkClient.delete(brokerPartTopicPath)
+  def getTopicsBeingDeleted(zkClient: ZkClient): Set[String] = {
+    val jsonTopicListOpt = readDataMaybeNull(zkClient, deleteTopicsPath)._1
+    jsonTopicListOpt match {
+      case Some(jsonTopicList) => DeleteTopicsCommand.parseDeleteTopicsData(jsonTopicList)
+      case None => Set.empty[String]
+    }
   }
 
   def getConsumersInGroup(zkClient: ZkClient, group: String): Seq[String] = {
