diff --git a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
deleted file mode 100644
index 3da4518..0000000
--- a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import joptsimple.OptionParser
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
-
-object DeleteTopicCommand {
-
-  def main(args: Array[String]): Unit = {
-    val parser = new OptionParser
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be deleted.")
-                         .withRequiredArg
-                         .describedAs("topic")
-                         .ofType(classOf[String])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
-                                      "Multiple URLS can be given to allow fail-over.")
-                           .withRequiredArg
-                           .describedAs("urls")
-                           .ofType(classOf[String])
-
-    val options = parser.parse(args : _*)
-
-    for(arg <- List(topicOpt, zkConnectOpt)) {
-      if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-
-    val topic = options.valueOf(topicOpt)
-    val zkConnect = options.valueOf(zkConnectOpt)
-    var zkClient: ZkClient = null
-    try {
-      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-      zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
-      println("deletion succeeded!")
-    }
-    catch {
-      case e =>
-        println("delection failed because of " + e.getMessage)
-        println(Utils.stackTrace(e))
-    }
-    finally {
-      if (zkClient != null)
-        zkClient.close()
-    }
-  }
-}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/admin/DeleteTopicsCommand.scala b/core/src/main/scala/kafka/admin/DeleteTopicsCommand.scala
new file mode 100644
index 0000000..2175975
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/DeleteTopicsCommand.scala
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import joptsimple.OptionParser
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils._
+import kafka.common.AdminCommandFailedException
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+
+object DeleteTopicsCommand extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    val parser = new OptionParser
+    val topicsOpt = parser.accepts("topics", "REQUIRED: Comma separated list of the topics to be deleted.")
+                           .withRequiredArg
+                           .describedAs("topics")
+                           .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+                                      "Multiple URLS can be given to allow fail-over.")
+                               .withRequiredArg
+                               .describedAs("urls")
+                               .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+
+    for(arg <- List(topicsOpt, zkConnectOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val topics = options.valueOf(topicsOpt).split(",").toSeq
+    val zkConnect = options.valueOf(zkConnectOpt)
+    var zkClient: ZkClient = null
+    try {
+      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+      val existingTopics = ZkUtils.getAllTopics(zkClient)
+      for (topic <- topics) {
+        if (!existingTopics.contains(topic)) {
+          println("Topic " + topic + " does not exist")
+          exit(1)
+        }
+      }
+      val deleteTopicsCommand = new DeleteTopicsCommand(zkClient, topics)
+      deleteTopicsCommand.deleteTopics()
+    } catch {
+      case e =>
+        println("Deletion failed because of " + e.getMessage)
+        println(Utils.stackTrace(e))
+    }
+    finally {
+      if (zkClient != null)
+        zkClient.close()
+    }
+  }
+
+  def parseDeleteTopicsData(data: String): Set[String] = {
+    Json.parseFull(data) match {
+      case Some(m) => m.asInstanceOf[Map[String, Any]].get("topics") match {
+        case Some(topics) => topics.asInstanceOf[List[String]].toSet
+        case None => throw new AdministrationException("Delete topics data is empty or malformed")
+      }
+      case None => throw new AdministrationException("Delete topics data is empty or malformed")
+    }
+  }
+
+  def writeDeleteTopicsDataToZk(zkClient: ZkClient, topics: Seq[String]) {
+    val deleteTopicsZkPath = ZkUtils.deleteTopicsPath
+    val jsonTopicsData = Utils.seqToJson(topics, valueInQuotes = true)
+    val jsonData = Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString), valueInQuotes = false) ++
+                                         Utils.mapToJsonFields(Map("topics" -> jsonTopicsData), valueInQuotes = false))
+    try {
+      ZkUtils.createPersistentPath(zkClient, deleteTopicsZkPath, jsonData)
+      info("Created delete topic path with %s".format(jsonData))
+    } catch {
+      case nee: ZkNodeExistsException =>
+        val topicBeingDeleted = parseDeleteTopicsData(ZkUtils.readData(zkClient, deleteTopicsZkPath)._1)
+        throw new AdministrationException("Topics " + topics.mkString(",") + " are currently being deleted")
+      case e => throw new AdministrationException(e.toString)
+    }
+  }
+}
+
+class DeleteTopicsCommand(val zkClient: ZkClient, val topics: Seq[String]) extends Logging {
+
+  def deleteTopics() {
+    try {
+      DeleteTopicsCommand.writeDeleteTopicsDataToZk(zkClient, topics)
+    } catch {
+      case e => throw new AdminCommandFailedException("Admin command failed", e)
+    }
+  }
+}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 2ca7ee6..b346eaf 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -23,7 +23,7 @@ import kafka.api.LeaderAndIsr
 import kafka.server.ReplicaManager
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ErrorMapping
+import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
 import org.apache.log4j.Logger
 
@@ -52,6 +52,7 @@ class Partition(val topic: String,
    * In addition to the leader, the controller can also send the epoch of the controller that elected the leader for
    * each partition. */
   private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
+  private var deleted = false
   this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
@@ -120,7 +121,12 @@ class Partition(val topic: String,
     assignedReplicaMap.values.toSet
   }
 
-
+  def delete() {
+    leaderIsrUpdateLock synchronized {
+      deleted = true
+      replicaManager.logManager.deleteLog(TopicAndPartition(topic, partitionId))
+    }
+  }
   /**
    *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the leader in the following steps.
    *  1. stop the existing replica fetcher
@@ -131,6 +137,12 @@ class Partition(val topic: String,
   def makeLeader(controllerId: Int, topic: String, partitionId: Int,
                  leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = {
     leaderIsrUpdateLock synchronized {
+      if (deleted) {
+        stateChangeLogger.trace(("Broker %d discarded the become-leader request with correlation id %d from " +
+                                 "controller %d epoch %d for partition [%s,%d] since the partition has been deleted")
+                                   .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, topic, partitionId))
+        return false
+      }
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
       if (leaderEpoch >= leaderAndIsr.leaderEpoch){
         stateChangeLogger.trace(("Broker %d discarded the become-leader request with correlation id %d from " +
@@ -168,6 +180,12 @@ class Partition(val topic: String,
   def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
                    liveBrokers: Set[Broker], correlationId: Int): Boolean = {
     leaderIsrUpdateLock synchronized {
+      if (deleted) {
+        stateChangeLogger.trace(("Broker %d discarded the become-follower request with correlation id %d from " +
+                                 "controller %d epoch %d for partition [%s,%d] since the partition has been deleted")
+                                   .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, topic, partitionId))
+        return false
+      }
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
       if (leaderEpoch >= leaderAndIsr.leaderEpoch) {
         stateChangeLogger.trace(("Broker %d discarded the become-follower request with correlation id %d from " +
@@ -205,52 +223,61 @@ class Partition(val topic: String,
 
   def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
     leaderIsrUpdateLock synchronized {
-      debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId))
-      val replica = getOrCreateReplica(replicaId)
-      replica.logEndOffset = offset
+      if (deleted) {
+        warn("Partition [%s,%d] has already been deleted, cannot update leader HW or expand ISR".format(topic, partitionId))
+      } else {
+        debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId))
+        val replica = getOrCreateReplica(replicaId)
+        replica.logEndOffset = offset
 
-      // check if this replica needs to be added to the ISR
-      leaderReplicaIfLocal() match {
-        case Some(leaderReplica) =>
-          val replica = getReplica(replicaId).get
-          val leaderHW = leaderReplica.highWatermark
-          if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW) {
-            // expand ISR
-            val newInSyncReplicas = inSyncReplicas + replica
-            info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(", ")))
-            // update ISR in ZK and cache
-            updateIsr(newInSyncReplicas)
-            replicaManager.isrExpandRate.mark()
-          }
-          maybeIncrementLeaderHW(leaderReplica)
-        case None => // nothing to do if no longer leader
+        // check if this replica needs to be added to the ISR
+        leaderReplicaIfLocal() match {
+          case Some(leaderReplica) =>
+            val replica = getReplica(replicaId).get
+            val leaderHW = leaderReplica.highWatermark
+            if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW) {
+              // expand ISR
+              val newInSyncReplicas = inSyncReplicas + replica
+              info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(", ")))
+              // update ISR in ZK and cache
+              updateIsr(newInSyncReplicas)
+              replicaManager.isrExpandRate.mark()
+            }
+            maybeIncrementLeaderHW(leaderReplica)
+          case None => // nothing to do if no longer leader
+        }
       }
     }
   }
 
   def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = {
     leaderIsrUpdateLock synchronized {
-      leaderReplicaIfLocal() match {
-        case Some(_) =>
-          val numAcks = inSyncReplicas.count(r => {
-            if (!r.isLocal)
-              r.logEndOffset >= requiredOffset
-            else
-              true /* also count the local (leader) replica */
-          })
-          trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
-          if ((requiredAcks < 0 && numAcks >= inSyncReplicas.size) ||
-            (requiredAcks > 0 && numAcks >= requiredAcks)) {
-            /*
-            * requiredAcks < 0 means acknowledge after all replicas in ISR
-            * are fully caught up to the (local) leader's offset
-            * corresponding to this produce request.
-            */
-            (true, ErrorMapping.NoError)
-          } else
-            (false, ErrorMapping.NoError)
-        case None =>
-          (false, ErrorMapping.NotLeaderForPartitionCode)
+      if (deleted) {
+        warn("Partition [%s,%d] has already been deleted, cannot check enough replicas reach offset")
+        (false, ErrorMapping.PartitionAlreadyDeletedCode)
+      } else {
+        leaderReplicaIfLocal() match {
+          case Some(_) =>
+            val numAcks = inSyncReplicas.count(r => {
+              if (!r.isLocal)
+                r.logEndOffset >= requiredOffset
+              else
+                true /* also count the local (leader) replica */
+            })
+            trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
+            if ((requiredAcks < 0 && numAcks >= inSyncReplicas.size) ||
+              (requiredAcks > 0 && numAcks >= requiredAcks)) {
+              /*
+              * requiredAcks < 0 means acknowledge after all replicas in ISR
+              * are fully caught up to the (local) leader's offset
+              * corresponding to this produce request.
+              */
+              (true, ErrorMapping.NoError)
+            } else
+              (false, ErrorMapping.NoError)
+          case None =>
+            (false, ErrorMapping.NotLeaderForPartitionCode)
+        }
       }
     }
   }
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index c8769e0..795f608 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -41,6 +41,7 @@ object ErrorMapping {
   val ReplicaNotAvailableCode: Short = 9
   val MessageSizeTooLargeCode: Short = 10
   val StaleControllerEpochCode: Short = 11
+  val PartitionAlreadyDeletedCode: Short = 12
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -54,7 +55,8 @@ object ErrorMapping {
       classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode,
       classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
       classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode,
-      classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode
+      classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode,
+      classOf[PartitionAlreadyDeletedException].asInstanceOf[Class[Throwable]] -> PartitionAlreadyDeletedCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */
diff --git a/core/src/main/scala/kafka/common/PartitionAlreadyDeletedException.scala b/core/src/main/scala/kafka/common/PartitionAlreadyDeletedException.scala
new file mode 100644
index 0000000..fc186ec
--- /dev/null
+++ b/core/src/main/scala/kafka/common/PartitionAlreadyDeletedException.scala
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates that the partition was already deleted.
+ */
+class PartitionAlreadyDeletedException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 037a995..9213d1c 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -21,7 +21,7 @@ import collection.immutable.Set
 import com.yammer.metrics.core.Gauge
 import java.lang.{IllegalStateException, Object}
 import java.util.concurrent.TimeUnit
-import kafka.admin.PreferredReplicaLeaderElectionCommand
+import kafka.admin.{DeleteTopicsCommand, PreferredReplicaLeaderElectionCommand}
 import kafka.api._
 import kafka.cluster.Broker
 import kafka.common._
@@ -50,7 +50,8 @@ class ControllerContext(val zkClient: ZkClient,
                         var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] =
                           new mutable.HashMap,
                         var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] =
-                          new mutable.HashSet) {
+                          new mutable.HashSet,
+                        var topicsBeingDeleted: mutable.Set[String] = new mutable.HashSet) {
 
   private var liveBrokersUnderlying: Set[Broker] = Set.empty
   private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
@@ -235,6 +236,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
       registerReassignedPartitionsListener()
       registerPreferredReplicaElectionListener()
+      registerDeleteTopicsListener()
       partitionStateMachine.registerListeners()
       replicaStateMachine.registerListeners()
       initializeControllerContext()
@@ -244,6 +246,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
       initializeAndMaybeTriggerPartitionReassignment()
       initializeAndMaybeTriggerPreferredReplicaElection()
+      initializeAndMaybeTriggerTopicDeletion()
     }
     else
       info("Controller has been shut down, aborting startup/failover")
@@ -312,11 +315,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
     val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
       deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader)).keySet
+    // handle dead replicas
+    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), OfflineReplica)
     partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
     // trigger OnlinePartition state changes for offline or new partitions
     partitionStateMachine.triggerOnlinePartitionStateChange()
-    // handle dead replicas
-    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), OfflineReplica)
   }
 
   /**
@@ -401,6 +404,27 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     }
   }
 
+  def onTopicsDeletion(topics: Set[String]) {
+    info("Starting deletion of topics %s".format(topics.mkString(",")))
+    try {
+      controllerContext.topicsBeingDeleted ++= topics
+      val partitions = controllerContext.partitionReplicaAssignment.filter(p => topics.contains(p._1.topic))
+      val partitionsToDelete = partitions.keys.toSet
+      val replicasToDelete = partitionsToDelete.map(p => partitions(p).filter(controllerContext.liveBrokerIds.contains(_)).map(r =>
+         PartitionAndReplica(p.topic, p.partition, r))
+      ).flatten.toSet
+      replicaStateMachine.handleStateChanges(replicasToDelete, OfflineReplica)
+      partitionStateMachine.handleStateChanges(partitionsToDelete, OfflinePartition)
+      replicaStateMachine.handleStateChanges(replicasToDelete, NonExistentReplica)
+      partitionStateMachine.handleStateChanges(partitionsToDelete, NonExistentPartition)
+      topics.foreach(topic => ZkUtils.deletePathRecursive(zkClient, ZkUtils.getTopicPath(topic)))
+    } catch {
+      case e => error("Error completing deletion of topics " + topics.mkString(","), e)
+    } finally {
+      removeFromTopicsBeingDeleted(topics)
+    }
+  }
+
   /**
    * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
    * is the controller. It merely registers the session expiration listener and starts the controller leader
@@ -516,6 +540,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     onPreferredReplicaElection(controllerContext.partitionsUndergoingPreferredReplicaElection.toSet)
   }
 
+  private def initializeAndMaybeTriggerTopicDeletion() {
+    // Read the topics being deleted from zookeeper path /admin/delete_topics
+    val topicsBeingDeleted = ZkUtils.getTopicsBeingDeleted(zkClient)
+    // Check if they are already deleted
+    val topicsThatWereDeleted = topicsBeingDeleted.filter(!controllerContext.allTopics.contains(_))
+    controllerContext.topicsBeingDeleted ++= topicsBeingDeleted
+    controllerContext.topicsBeingDeleted --= topicsThatWereDeleted
+    info("Topics being deleted: %s".format(topicsBeingDeleted.mkString(",")))
+    info("Topics that were already deleted: %s".format(topicsThatWereDeleted.mkString(",")))
+    info("Resuming deletion for topics: %s".format(controllerContext.topicsBeingDeleted.mkString(",")))
+    onTopicsDeletion(controllerContext.topicsBeingDeleted.toSet)
+  }
+
   private def startChannelManager() {
     controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config)
     controllerContext.controllerChannelManager.startup()
@@ -620,6 +657,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, new PreferredReplicaElectionListener(this))
   }
 
+  private def registerDeleteTopicsListener() {
+    zkClient.subscribeDataChanges(ZkUtils.deleteTopicsPath, new DeleteTopicsListener(this))
+  }
+
   private def registerControllerChangedListener() {
     zkClient.subscribeDataChanges(ZkUtils.ControllerEpochPath, new ControllerEpochListener(this))
   }
@@ -663,6 +704,18 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
   }
 
+  def removeFromTopicsBeingDeleted(topicsToBeRemoved: Set[String]) {
+    for (topic <- topicsToBeRemoved) {
+      if (ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPath(topic))._1 == None) {
+        info("Topic %s was deleted successfully".format(topic))
+      } else {
+        warn("Topic %s could not be deleted".format(topic))
+      }
+    }
+    ZkUtils.deletePath(zkClient, ZkUtils.deleteTopicsPath)
+    controllerContext.topicsBeingDeleted --= topicsToBeRemoved
+  }
+
   private def getAllReplicasForPartition(partitions: Set[TopicAndPartition]): Set[PartitionAndReplica] = {
     partitions.map { p =>
       val replicas = controllerContext.partitionReplicaAssignment(p)
@@ -932,8 +985,39 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD
   }
 
   /**
-   * @throws Exception
-   *             On any error.
+   * @throws Exception on any error.
+   */
+  @throws(classOf[Exception])
+  def handleDataDeleted(dataPath: String) {
+  }
+}
+
+/**
+ * Starts the deletion of topics specified under /admin/delete_topics
+ */
+class DeleteTopicsListener(controller: KafkaController) extends IZkDataListener with Logging {
+  this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]:"
+  val controllerContext = controller.controllerContext
+  val zkClient = controllerContext.zkClient
+
+  /**
+   * Invoked when some topics are specified for deletion by the admin command
+   * @throws Exception on any error
+   */
+  @throws(classOf[Exception])
+  def handleDataChange(dataPath: String, data: Object) {
+    debug("Delete topics listener fired for path %s. Record topics to undergo deletion %s".format(dataPath, data.toString))
+    val topicsToBeDeleted = DeleteTopicsCommand.parseDeleteTopicsData(data.toString).toSet
+
+    controllerContext.controllerLock synchronized {
+      info("These topics are already being deleted: %s".format(controllerContext.topicsBeingDeleted.mkString(",")))
+      val newTopicsToBeDeleted = topicsToBeDeleted.filter(t => !controllerContext.topicsBeingDeleted.contains(t))
+      controller.onTopicsDeletion(newTopicsToBeDeleted)
+    }
+  }
+
+  /**
+   * @throws Exception on any error.
    */
   @throws(classOf[Exception])
   def handleDataDeleted(dataPath: String) {
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index c017727..1d1d11c 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -172,7 +172,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
           stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from Offline to NotExists"
                                     .format(controllerId, controller.epoch, topicAndPartition))
-          partitionState.put(topicAndPartition, NonExistentPartition)
+          deletePartition(topicAndPartition)
           // post: partition state is deleted from all brokers and zookeeper
       }
     } catch {
@@ -182,6 +182,14 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     }
   }
 
+  private def deletePartition(topicAndPartition: TopicAndPartition) {
+    partitionState.remove(topicAndPartition)
+    controllerContext.partitionLeadershipInfo.remove(topicAndPartition)
+    controllerContext.partitionReplicaAssignment.remove(topicAndPartition)
+    controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
+    controllerContext.partitionsUndergoingPreferredReplicaElection.remove(topicAndPartition)
+  }
+
   /**
    * Invoked on startup of the partition's state machine to set the initial state for all existing partitions in
    * zookeeper
@@ -364,13 +372,14 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             val currentChildren = JavaConversions.asBuffer(children).toSet
             val newTopics = currentChildren -- controllerContext.allTopics
             val deletedTopics = controllerContext.allTopics -- currentChildren
-            //        val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1))
             controllerContext.allTopics = currentChildren
 
             val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
             controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
               !deletedTopics.contains(p._1.topic))
             controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
+
+            // TODO: Remove all of above and rename this to TopicAdditionListener
             info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
               deletedTopics, addedPartitionReplicaAssignment))
             if(newTopics.size > 0)
@@ -378,7 +387,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           } catch {
             case e => error("Error while handling new topic", e )
           }
-          // TODO: kafka-330  Handle deleted topics
         }
       }
     }
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index bea1644..afd1a81 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -127,16 +127,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           replicaState.put((topic, partition, replicaId), NewReplica)
           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NewReplica"
                                     .format(controllerId, controller.epoch, replicaId, topicAndPartition))
-        case NonExistentReplica =>
-          assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
-          // send stop replica command
-          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true)
-          // remove this replica from the assigned replicas list for its partition
-          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-          controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
-          replicaState.remove((topic, partition, replicaId))
-          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NonExistentReplica"
-                                    .format(controllerId, controller.epoch, replicaId, topicAndPartition))
         case OnlineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState)
           replicaState((topic, partition, replicaId)) match {
@@ -153,7 +143,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                 case Some(leaderIsrAndControllerEpoch) =>
                   brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
                     replicaAssignment.size)
-                  replicaState.put((topic, partition, replicaId), OnlineReplica)
                   stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica"
                     .format(controllerId, controller.epoch, replicaId, topicAndPartition))
                 case None => // that means the partition was never in OnlinePartition state, this means the broker never
@@ -173,9 +162,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                     case Some(updatedLeaderIsrAndControllerEpoch) =>
                       // send the shrunk ISR state change request only to the leader
                       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
-                        topic, partition, updatedLeaderIsrAndControllerEpoch,
-                        replicaAssignment.size)
-                      replicaState.put((topic, partition, replicaId), OfflineReplica)
+                                                                          topic, partition, updatedLeaderIsrAndControllerEpoch,
+                                                                          replicaAssignment.size)
                       stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica"
                                                 .format(controllerId, controller.epoch, replicaId, topicAndPartition))
                       false
@@ -186,10 +174,17 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
               case None =>
                 true
             }
-          if (leaderAndIsrIsEmpty)
-            throw new StateChangeFailedException(
-              "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
-              .format(replicaId, topicAndPartition))
+          replicaState.put((topic, partition, replicaId), OfflineReplica)
+        case NonExistentReplica =>
+          assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
+          // send stop replica command
+          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true)
+          // remove this replica from the assigned replicas list for its partition
+          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
+          controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
+          replicaState.remove((topic, partition, replicaId))
+          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NonExistentReplica"
+                                    .format(controllerId, controller.epoch, replicaId, topicAndPartition))
       }
     }
     catch {
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 7d71451..641fabf 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -126,7 +126,11 @@ private[kafka] class Log(val dir: File,
     
   /* Calculate the offset of the next message */
   private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset())
-  
+
+  val topic = name.substring(0, name.lastIndexOf("-"))
+  val partition = name.substring(name.lastIndexOf("-") + 1, name.length).toInt
+  private var deleted = false
+
   debug("Completed load of log %s with log end offset %d".format(name, logEndOffset))
 
   newGauge(name + "-" + "NumLogSegments",
@@ -256,7 +260,7 @@ private[kafka] class Log(val dir: File,
    * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
    * 
    * Returns a tuple containing (first_offset, last_offset) for the newly appended of the message set, 
-   * or (-1,-1) if the message set is empty
+   * or (-1,-1) if the message set is empty or (-2, -2) if the log has been deleted.
    */
   def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long) = {
     val messageSetInfo = analyzeAndValidateMessageSet(messages)
@@ -271,58 +275,63 @@ private[kafka] class Log(val dir: File,
       try {
         // they are valid, insert them in the log
         val offsets = lock synchronized {
-          val firstOffset = nextOffset.get
-
-          // maybe roll the log if this segment is full
-          val segment = maybeRoll(segments.view.last)
-          
-          // assign offsets to the messageset
-          val lastOffset =
-            if(assignOffsets) {
-              val offsetCounter = new AtomicLong(nextOffset.get)
-              try {
-                validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec)
-              } catch {
-                case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
+          if (deleted) {
+            warn("Log for partition [%s,%d] has been deleted, cannot append messages".format(topic, partition))
+            (-2L, -2L)
+          } else {
+            val firstOffset = nextOffset.get
+
+            // maybe roll the log if this segment is full
+            val segment = maybeRoll(segments.view.last)
+
+            // assign offsets to the messageset
+            val lastOffset =
+              if(assignOffsets) {
+                val offsetCounter = new AtomicLong(nextOffset.get)
+                try {
+                  validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec)
+                } catch {
+                  case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
+                }
+                val assignedLastOffset = offsetCounter.get - 1
+                val numMessages = assignedLastOffset - firstOffset + 1
+                BrokerTopicStats.getBrokerTopicStats(topic).messagesInRate.mark(numMessages)
+                BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numMessages)
+                assignedLastOffset
+              } else {
+                require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " + messages)
+                require(messageSetInfo.firstOffset >= nextOffset.get,
+                        "Attempt to append a message set beginning with offset %d to a log with log end offset %d."
+                        .format(messageSetInfo.firstOffset, nextOffset.get))
+                messageSetInfo.lastOffset
               }
-              val assignedLastOffset = offsetCounter.get - 1
-              val numMessages = assignedLastOffset - firstOffset + 1
-              BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(numMessages)
-              BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numMessages)
-              assignedLastOffset
-            } else {
-              require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " + messages)
-              require(messageSetInfo.firstOffset >= nextOffset.get, 
-                      "Attempt to append a message set beginning with offset %d to a log with log end offset %d."
-                      .format(messageSetInfo.firstOffset, nextOffset.get))
-              messageSetInfo.lastOffset
+
+            // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison
+            // happens with the new message size (after re-compression, if any)
+            for(messageAndOffset <- validMessages.shallowIterator) {
+              if(MessageSet.entrySize(messageAndOffset.message) > maxMessageSize)
+                throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
+                  .format(MessageSet.entrySize(messageAndOffset.message), maxMessageSize))
             }
 
-          // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison
-          // happens with the new message size (after re-compression, if any)
-          for(messageAndOffset <- validMessages.shallowIterator) {
-            if(MessageSet.entrySize(messageAndOffset.message) > maxMessageSize)
-              throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
-                .format(MessageSet.entrySize(messageAndOffset.message), maxMessageSize))
-          }
+            // now append to the log
+            segment.append(firstOffset, validMessages)
 
-          // now append to the log
-          segment.append(firstOffset, validMessages)
-          
-          // advance the log end offset
-          nextOffset.set(lastOffset + 1)
+            // advance the log end offset
+            nextOffset.set(lastOffset + 1)
 
-          trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
-                  .format(this.name, firstOffset, nextOffset.get(), validMessages))
+            trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
+                    .format(this.name, firstOffset, nextOffset.get(), validMessages))
 
-          // return the offset at which the messages were appended
-          (firstOffset, lastOffset)
+            // return the offset at which the messages were appended
+            (firstOffset, lastOffset)
+          }
         }
-        
+
         // maybe flush the log and index
         val numAppendedMessages = (offsets._2 - offsets._1 + 1).toInt
         maybeFlush(numAppendedMessages)
-        
+
         // return the first and last offset
         offsets
       } catch {
@@ -401,9 +410,12 @@ private[kafka] class Log(val dir: File,
     val view = segments.view
         
     // check if the offset is valid and in range
-    val first = view.head.start
+    val first = view.headOption match {
+      case Some(head) => head.start
+      case None => -1L
+    }
     val next = nextOffset.get
-    if(startOffset == next)
+    if(first == -1L || startOffset == next)
       return MessageSet.Empty
     else if(startOffset > next || startOffset < first)
       throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, first, next))
@@ -421,27 +433,32 @@ private[kafka] class Log(val dir: File,
    */
   def markDeletedWhile(predicate: LogSegment => Boolean): Seq[LogSegment] = {
     lock synchronized {
-      debug("Garbage collecting log..")
-      debug("Segments of log %s : %s ".format(this.name, segments.view.mkString(",")))
-      debug("Index files for log %s: %s".format(this.name, segments.view.map(_.index.file.exists()).mkString(",")))
-      debug("Data files for log %s: %s".format(this.name, segments.view.map(_.messageSet.file.exists()).mkString(",")))
-      val view = segments.view
-      val deletable = view.takeWhile(predicate)
-      for(seg <- deletable)
-        seg.deleted = true
-      var numToDelete = deletable.size
-      // if we are deleting everything, create a new empty segment
-      if(numToDelete == view.size) {
-        if (view(numToDelete - 1).size > 0)
-          roll()
-        else {
-          // If the last segment to be deleted is empty and we roll the log, the new segment will have the same
-          // file name. So simply reuse the last segment and reset the modified time.
-          view(numToDelete - 1).messageSet.file.setLastModified(time.milliseconds)
-          numToDelete -=1
+      if (deleted) {
+        warn("Log for partition [%s,%d] has been deleted, cannot delete segments".format(topic, partition))
+        Seq.empty[LogSegment]
+      } else {
+        debug("Garbage collecting log..")
+        debug("Segments of log %s : %s ".format(this.name, segments.view.mkString(",")))
+        debug("Index files for log %s: %s".format(this.name, segments.view.map(_.index.file.exists()).mkString(",")))
+        debug("Data files for log %s: %s".format(this.name, segments.view.map(_.messageSet.file.exists()).mkString(",")))
+        val view = segments.view
+        val deletable = view.takeWhile(predicate)
+        for(seg <- deletable)
+          seg.deleted = true
+        var numToDelete = deletable.size
+        // if we are deleting everything, create a new empty segment
+        if(numToDelete == view.size) {
+          if (view(numToDelete - 1).size > 0)
+            roll()
+          else {
+            // If the last segment to be deleted is empty and we roll the log, the new segment will have the same
+            // file name. So simply reuse the last segment and reset the modified time.
+            view(numToDelete - 1).messageSet.file.setLastModified(time.milliseconds)
+            numToDelete -=1
+          }
         }
+        segments.trunc(numToDelete)
       }
-      segments.trunc(numToDelete)
     }
   }
 
@@ -527,12 +544,16 @@ private[kafka] class Log(val dir: File,
       return
 
     lock synchronized {
-      debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
-          time.milliseconds)
-      segments.view.last.flush()
-      unflushed.set(0)
-      lastflushedTime.set(time.milliseconds)
-     }
+      if (deleted) {
+        warn("Log for partition [%s,%d] has been deleted, cannot flush".format(topic, partition))
+      } else {
+        debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
+            time.milliseconds)
+        segments.view.last.flush()
+        unflushed.set(0)
+        lastflushedTime.set(time.milliseconds)
+      }
+    }
   }
 
   def getOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
@@ -577,8 +598,18 @@ private[kafka] class Log(val dir: File,
   }
 
   def delete(): Unit = {
-    deleteSegments(segments.contents.get())
-    Utils.rm(dir)
+    lock synchronized {
+      if (deleted) {
+        warn("Log for partition [%s,%d] has already been deleted, cannot delete again".format(topic, partition))
+      } else {
+        deleted = true
+        this.close()
+        segments.view.foreach(_.deleted = true)
+        val deletedSegments = segments.trunc(segments.view.size)
+        deleteSegments(deletedSegments)
+        Utils.rm(dir)
+      }
+    }
   }
 
 
@@ -606,31 +637,35 @@ private[kafka] class Log(val dir: File,
     if(targetOffset < 0)
       throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset))
     lock synchronized {
-      val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
-      val viewSize = segments.view.size
-      val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
-      /* We should not hit this error because segments.view is locked in markedDeletedWhile() */
-      if(numSegmentsDeleted != segmentsToBeDeleted.size)
-        error("Failed to delete some segments when attempting to truncate to offset " + targetOffset +")")
-      if (numSegmentsDeleted == viewSize) {
-        segments.trunc(segments.view.size)
-        rollToOffset(targetOffset)
-        this.nextOffset.set(targetOffset)
+      if (deleted) {
+        warn("Partition [%s,%d] has been deleted, cannot truncate to target offset".format(topic, partition))
       } else {
-        if(targetOffset > logEndOffset) {
-          error("Target offset %d cannot be greater than the last message offset %d in the log %s".
-                format(targetOffset, logEndOffset, segments.view.last.messageSet.file.getAbsolutePath))
+        val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
+        val viewSize = segments.view.size
+        val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
+        /* We should not hit this error because segments.view is locked in markedDeletedWhile() */
+        if(numSegmentsDeleted != segmentsToBeDeleted.size)
+          error("Failed to delete some segments when attempting to truncate to offset " + targetOffset +")")
+        if (numSegmentsDeleted == viewSize) {
+          segments.trunc(segments.view.size)
+          rollToOffset(targetOffset)
+          this.nextOffset.set(targetOffset)
         } else {
-          // find the log segment that has this hw
-          val segmentToBeTruncated = findRange(segments.view, targetOffset)
-          segmentToBeTruncated match {
-            case Some(segment) =>
-              val truncatedSegmentIndex = segments.view.indexOf(segment)
-              segments.truncLast(truncatedSegmentIndex)
-              segment.truncateTo(targetOffset)
-              this.nextOffset.set(targetOffset)
-              info("Truncated log segment %s to target offset %d".format(segments.view.last.messageSet.file.getAbsolutePath, targetOffset))
-            case None => // nothing to do
+          if(targetOffset > logEndOffset) {
+            error("Target offset %d cannot be greater than the last message offset %d in the log %s".
+                  format(targetOffset, logEndOffset, segments.view.last.messageSet.file.getAbsolutePath))
+          } else {
+            // find the log segment that has this hw
+            val segmentToBeTruncated = findRange(segments.view, targetOffset)
+            segmentToBeTruncated match {
+              case Some(segment) =>
+                val truncatedSegmentIndex = segments.view.indexOf(segment)
+                segments.truncLast(truncatedSegmentIndex)
+                segment.truncateTo(targetOffset)
+                this.nextOffset.set(targetOffset)
+                info("Truncated log segment %s to target offset %d".format(segments.view.last.messageSet.file.getAbsolutePath, targetOffset))
+              case None => // nothing to do
+            }
           }
         }
       }
@@ -642,20 +677,21 @@ private[kafka] class Log(val dir: File,
    */
   def truncateAndStartWithNewOffset(newOffset: Long) {
     lock synchronized {
-      val deletedSegments = segments.trunc(segments.view.size)
-      info("Truncate and start log '" + name + "' to " + newOffset)
-      deleteSegments(deletedSegments)
-      segments.append(new LogSegment(dir,
-                                     newOffset,
-                                     indexIntervalBytes = indexIntervalBytes, 
-                                     maxIndexSize = maxIndexSize))
-      this.nextOffset.set(newOffset)
+      if (deleted) {
+        warn("Partition [%s,%d] has been deleted, cannot truncate and start with new offset".format(topic, partition))
+      } else {
+        val deletedSegments = segments.trunc(segments.view.size)
+        info("Truncate and start log '" + name + "' to " + newOffset)
+        deleteSegments(deletedSegments)
+        segments.append(new LogSegment(dir,
+                                       newOffset,
+                                       indexIntervalBytes = indexIntervalBytes,
+                                       maxIndexSize = maxIndexSize))
+        this.nextOffset.set(newOffset)
+      }
     }
   }
 
-  def topicName():String = {
-    name.substring(0, name.lastIndexOf("-"))
-  }
 
   def getLastFlushedTime():Long = {
     return lastflushedTime.get
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 497cfdd..055482a 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -280,6 +280,8 @@ private[kafka] class LogManager(val config: KafkaConfig,
                   (time.milliseconds - startMs) / 1000 + " seconds")
   }
 
+
+
   /**
    * Close all the logs
    */
@@ -302,6 +304,12 @@ private[kafka] class LogManager(val config: KafkaConfig,
    */
   def allLogs(): Iterable[Log] = logs.values
 
+  def deleteLog(topicAndPartition: TopicAndPartition) {
+    val log = logs.get(topicAndPartition)
+    log.delete()
+    logs.remove(topicAndPartition)
+  }
+
   /**
    * Flush any log which has exceeded its flush interval and has unwritten messages.
    */
@@ -311,15 +319,15 @@ private[kafka] class LogManager(val config: KafkaConfig,
       try {
         val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
         var logFlushInterval = config.logFlushIntervalMs
-        if(logFlushIntervals.contains(log.topicName))
-          logFlushInterval = logFlushIntervals(log.topicName)
-        debug(log.topicName + " flush interval  " + logFlushInterval +
+        if(logFlushIntervals.contains(log.topic))
+          logFlushInterval = logFlushIntervals(log.topic)
+        debug(log.topic + " flush interval  " + logFlushInterval +
                       " last flushed " + log.getLastFlushedTime + " time since last flush: " + timeSinceLastFlush)
         if(timeSinceLastFlush >= logFlushInterval)
           log.flush
       } catch {
         case e =>
-          error("Error flushing topic " + log.topicName, e)
+          error("Error flushing topic " + log.topic, e)
           e match {
             case _: IOException =>
               fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 68e712c..b0dff2b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -114,13 +114,14 @@ class ReplicaManager(val config: KafkaConfig,
     getReplica(topic, partitionId) match {
       case Some(replica) =>
         replicaFetcherManager.removeFetcher(topic, partitionId)
-        /* TODO: handle deleteLog in a better way */
-        //if (deletePartition)
-        //  logManager.deleteLog(topic, partition)
+        val partition = replica.partition
         leaderPartitionsLock synchronized {
-          leaderPartitions -= replica.partition
+          leaderPartitions.remove(partition)
         }
         allPartitions.remove((topic, partitionId))
+        if (deletePartition) {
+          partition.delete()
+        }
         info("After removing partition [%s,%d], the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions))
       case None => //do nothing if replica no longer exists
     }
@@ -239,6 +240,11 @@ class ReplicaManager(val config: KafkaConfig,
       // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
       // have been completely populated before starting the checkpointing there by avoiding weird race conditions
       if (!hwThreadInitialized) {
+        // If the controller did not instruct this broker to initialize replicas for certain partitions in the first leaderAndIsr request,
+        // delete the logs of those partitions.
+        val partitionsToBeDeleted = logManager.allLogs.map(l => TopicAndPartition(l.topic, l.partition)).filter(p => !allPartitions.contains(p.asTuple))
+        partitionsToBeDeleted.foreach(logManager.deleteLog(_))
+        info("Logs for these partitions were deleted: %s".format(partitionsToBeDeleted.mkString(",")))
         startHighWaterMarksCheckPointThread()
         hwThreadInitialized = true
       }
@@ -266,7 +272,6 @@ class ReplicaManager(val config: KafkaConfig,
   private def makeFollower(controllerId: Int, epoch: Int, topic: String, partitionId: Int,
                            partitionStateInfo: PartitionStateInfo, liveBrokers: Set[Broker], correlationId: Int) {
     val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
-    val leaderBrokerId: Int = leaderIsrAndControllerEpoch.leaderAndIsr.leader
     stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " +
                              "starting the become-follower transition for partition [%s,%d]")
                                .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId))
@@ -293,7 +298,8 @@ class ReplicaManager(val config: KafkaConfig,
     if(partitionOpt.isDefined) {
       partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset)
     } else {
-      warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId))
+      warn(("While recording the follower position, the partition [%s,%d] hasn't been created or has been deleted, " +
+            "skip updating leader HW").format(topic, partitionId))
     }
   }
 
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index c639efb..b20806b 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -476,7 +476,7 @@ object Utils extends Logging {
   }
 
  /**
-   * Format a Map[String, String] as JSON object.
+   * Format a Map[String, String] as a sequence of JSON fields.
    */
   def mapToJsonFields(jsonDataMap: Map[String, String], valueInQuotes: Boolean): Seq[String] = {
     val jsonFields: mutable.ListBuffer[String] = ListBuffer()
@@ -507,7 +507,7 @@ object Utils extends Logging {
     val builder = new StringBuilder
     builder.append("[ ")
     if (valueInQuotes)
-      builder.append(jsonData.map("\"" + _ + "\"")).mkString(", ")
+      builder.append(jsonData.map("\"" + _ + "\"").mkString(", "))
     else
       builder.append(jsonData.mkString(", "))
     builder.append(" ]")
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ce1904b..5c7b2cf 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -43,6 +43,7 @@ object ZkUtils extends Logging {
   val ControllerEpochPath = "/controller_epoch"
   val ReassignPartitionsPath = "/admin/reassign_partitions"
   val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
+  val deleteTopicsPath = "/admin/delete_topics"
 
   def getTopicPath(topic: String): String ={
     BrokerTopicsPath + "/" + topic
@@ -420,7 +421,8 @@ object ZkUtils extends Logging {
                       } catch {
                         case e: ZkNoNodeException =>
                           (None, stat)
-                        case e2 => throw e2
+                        case e2 =>
+                          throw e2
                       }
     dataAndStat
   }
@@ -638,11 +640,12 @@ object ZkUtils extends Logging {
     }
   }
 
-  def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
-    val brokerIdPath = BrokerIdsPath + "/" + brokerId
-    zkClient.delete(brokerIdPath)
-    val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
-    zkClient.delete(brokerPartTopicPath)
+  def getTopicsBeingDeleted(zkClient: ZkClient): Set[String] = {
+    val jsonTopicListOpt = readDataMaybeNull(zkClient, deleteTopicsPath)._1
+    jsonTopicListOpt match {
+      case Some(jsonTopicList) => DeleteTopicsCommand.parseDeleteTopicsData(jsonTopicList)
+      case None => Set.empty[String]
+    }
   }
 
   def getConsumersInGroup(zkClient: ZkClient, group: String): Seq[String] = {
