Index: core/src/main/scala/kafka/utils/KafkaScheduler.scala
===================================================================
--- core/src/main/scala/kafka/utils/KafkaScheduler.scala	(revision 1171219)
+++ core/src/main/scala/kafka/utils/KafkaScheduler.scala	(working copy)
@@ -36,12 +36,19 @@
       t
     }
   })
-  
+  executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
+  executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+
   def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) =
     executor.scheduleAtFixedRate(Utils.loggedRunnable(fun), delayMs, periodMs, TimeUnit.MILLISECONDS)
 
-  def shutdown() = {
-    executor.shutdownNow
+  def shutdownNow() {
+    executor.shutdownNow()
+    logger.info("force shutdown scheduler " + baseThreadName)
+  }
+
+  def shutdown() {
+    executor.shutdown()
     logger.info("shutdown scheduler " + baseThreadName)
   }
 }
Index: core/src/main/scala/kafka/log/LogManager.scala
===================================================================
--- core/src/main/scala/kafka/log/LogManager.scala	(revision 1171219)
+++ core/src/main/scala/kafka/log/LogManager.scala	(working copy)
@@ -226,7 +226,7 @@
    * Close all the logs
    */
   def close() {
-    logFlusherScheduler.shutdown
+    logFlusherScheduler.shutdown()
     val iter = getLogIterator
     while(iter.hasNext)
       iter.next.close()
@@ -283,7 +283,7 @@
             case _ =>
           }
       }
-    }     
+    }
   }
 
 
Index: core/src/main/scala/kafka/producer/Producer.scala
===================================================================
--- core/src/main/scala/kafka/producer/Producer.scala	(revision 1171219)
+++ core/src/main/scala/kafka/producer/Producer.scala	(working copy)
@@ -37,6 +37,8 @@
   private val hasShutdown = new AtomicBoolean(false)
   if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerPartitionInfo))
     throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
+  if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerPartitionInfo))
+    logger.warn("Both zk.connect and broker.list provided (zk.connect takes precedence).")
   private val random = new java.util.Random
   // check if zookeeper based auto partition discovery is enabled
   private val zkEnabled = Utils.propertyExists(config.zkConnect)
Index: core/src/main/scala/kafka/server/KafkaServer.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision 1171219)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(working copy)
@@ -96,7 +96,7 @@
     if (canShutdown) {
       logger.info("Shutting down...")
       try {
-        scheduler.shutdown
+        scheduler.shutdown()
         if (socketServer != null)
           socketServer.shutdown()
         Utils.swallow(logger.warn, Utils.unregisterMBean(statsMBeanName))
