diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index a2ea5a9..5294d8e 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -82,6 +82,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   private val isShuttingDown = new AtomicBoolean(false)
   private val rebalanceLock = new Object
+  private val shutdownLock = new ReentrantLock
   private var fetcher: Option[ConsumerFetcherManager] = None
   private var zkClient: ZkClient = null
   private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
@@ -153,32 +154,37 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   }
 
   def shutdown() {
-    val canShutdown = isShuttingDown.compareAndSet(false, true);
-    if (canShutdown) {
-      info("ZKConsumerConnector shutting down")
-
-      if (wildcardTopicWatcher != null)
-        wildcardTopicWatcher.shutdown()
-      try {
-        if (config.autoCommitEnable)
-          scheduler.shutdownNow()
-        fetcher match {
-          case Some(f) => f.stopConnections
-          case None =>
-        }
-        sendShutdownToAllQueues()
-        if (config.autoCommitEnable)
-          commitOffsets()
-        if (zkClient != null) {
-          zkClient.close()
-          zkClient = null
-        }
-      } catch {
-        case e =>
-          fatal("error during consumer connector shutdown", e)
+    shutdownLock.lock()
+    try {
+      val canShutdown = isShuttingDown.compareAndSet(false, true);
+      if (canShutdown) {
+		info("ZKConsumerConnector shutting down")
+
+		if (wildcardTopicWatcher != null)
+          wildcardTopicWatcher.shutdown()
+		try {
+          if (config.autoCommitEnable)
+			scheduler.shutdownNow()
+          fetcher match {
+			case Some(f) => f.stopConnections
+			case None =>
+          }
+          sendShutdownToAllQueues()
+          if (config.autoCommitEnable)
+			commitOffsets()
+          if (zkClient != null) {
+			zkClient.close()
+			zkClient = null
+          }
+		} catch {
+          case e =>
+			fatal("error during consumer connector shutdown", e)
+		}
+		info("ZKConsumerConnector shut down completed")
       }
-      info("ZKConsumerConnector shut down completed")
-    }
+	} finally {
+      shutdownLock.unlock()
+	}
   }
 
   def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
@@ -325,8 +331,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
               isWatcherTriggered = false
               lock.unlock()
             }
-            if (doRebalance)
-              syncedRebalance
+            shutdownLock.lock()
+            try {
+              if (doRebalance && !isShuttingDown.get())
+				syncedRebalance
+			} finally {
+              shutdownLock.unlock()
+			}
           } catch {
             case t => error("error during syncedRebalance", t)
           }
