diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 20c00cb..805eb72 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -119,7 +119,10 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
   def closeAllFetchers() {
     mapLock synchronized {
       for ( (_, fetcher) <- fetcherThreadMap) {
-        fetcher.shutdown()
+        fetcher.initiateShutdown()
+      }
+      for ( (_, fetcher) <- fetcherThreadMap) {
+        fetcher.awaitShutdown()
       }
       fetcherThreadMap.clear()
     }
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 8c281d4..325a185 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -70,6 +70,13 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
     simpleConsumer.close()
   }
 
+
+  override def initiateShutdown(): Boolean = {
+    val ret = super.initiateShutdown()
+    simpleConsumer.close()
+    ret
+  }
+
   override def doWork() {
     inLock(partitionMapLock) {
       if (partitionMap.isEmpty)
