diff --git core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 429d499..234ae06 100644
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -122,7 +122,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     if (canShutdown) {
       logger.info("ZKConsumerConnector shutting down")
       try {
-        scheduler.shutdown()
+        scheduler.shutdownNow()
         fetcher match {
           case Some(f) => f.shutdown()
           case None =>
diff --git core/src/main/scala/kafka/log/LogManager.scala core/src/main/scala/kafka/log/LogManager.scala
index 698a5f3..5bde936 100644
--- core/src/main/scala/kafka/log/LogManager.scala
+++ core/src/main/scala/kafka/log/LogManager.scala
@@ -260,7 +260,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
    * Close all the logs
    */
   def close() {
-    logFlusherScheduler.shutdown
+    logFlusherScheduler.shutdown()
     val iter = getLogIterator
     while(iter.hasNext)
       iter.next.close()
@@ -317,7 +317,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
             case _ =>
           }
       }
-    }     
+    }
   }
 
 
diff --git core/src/main/scala/kafka/producer/Producer.scala core/src/main/scala/kafka/producer/Producer.scala
index 3b90644..2774961 100644
--- core/src/main/scala/kafka/producer/Producer.scala
+++ core/src/main/scala/kafka/producer/Producer.scala
@@ -37,6 +37,8 @@ class Producer[K,V](config: ProducerConfig,
   private val hasShutdown = new AtomicBoolean(false)
   if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerPartitionInfo))
     throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
+  if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerPartitionInfo))
+    logger.warn("Both zk.connect and broker.list provided (zk.connect takes precedence).")
   private val random = new java.util.Random
   // check if zookeeper based auto partition discovery is enabled
   private val zkEnabled = Utils.propertyExists(config.zkConnect)
diff --git core/src/main/scala/kafka/server/KafkaServer.scala core/src/main/scala/kafka/server/KafkaServer.scala
index c7c74ec..1f62272 100644
--- core/src/main/scala/kafka/server/KafkaServer.scala
+++ core/src/main/scala/kafka/server/KafkaServer.scala
@@ -96,7 +96,7 @@ class KafkaServer(val config: KafkaConfig) {
     if (canShutdown) {
       logger.info("Shutting down...")
       try {
-        scheduler.shutdown
+        scheduler.shutdown()
         if (socketServer != null)
           socketServer.shutdown()
         Utils.swallow(logger.warn, Utils.unregisterMBean(statsMBeanName))
diff --git core/src/main/scala/kafka/utils/KafkaScheduler.scala core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 397db95..f090fdb 100644
--- core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -36,12 +36,19 @@ class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon:
       t
     }
   })
-  
+  executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
+  executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+
   def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) =
     executor.scheduleAtFixedRate(Utils.loggedRunnable(fun), delayMs, periodMs, TimeUnit.MILLISECONDS)
 
-  def shutdown() = {
-    executor.shutdownNow
+  def shutdownNow() {
+    executor.shutdownNow()
+    logger.info("force shutdown scheduler " + baseThreadName)
+  }
+
+  def shutdown() {
+    executor.shutdown()
     logger.info("shutdown scheduler " + baseThreadName)
   }
 }
