diff --git core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 429d499..234ae06 100644
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -122,7 +122,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     if (canShutdown) {
       logger.info("ZKConsumerConnector shutting down")
       try {
-        scheduler.shutdown()
+        scheduler.shutdownNow()
         fetcher match {
           case Some(f) => f.shutdown()
           case None =>
diff --git core/src/main/scala/kafka/log/Log.scala core/src/main/scala/kafka/log/Log.scala
index c403770..d4c074d 100644
--- core/src/main/scala/kafka/log/Log.scala
+++ core/src/main/scala/kafka/log/Log.scala
@@ -298,9 +298,7 @@ private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int,
   /**
    * Flush this log file to the physical disk
    */
-  def flush() : Unit = {
-    if (unflushed.get == 0) return
-
+  def flush() = {
     lock synchronized {
       if(logger.isDebugEnabled)
         logger.debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
diff --git core/src/main/scala/kafka/log/LogManager.scala core/src/main/scala/kafka/log/LogManager.scala
index 698a5f3..c8c52ef 100644
--- core/src/main/scala/kafka/log/LogManager.scala
+++ core/src/main/scala/kafka/log/LogManager.scala
@@ -51,7 +51,6 @@ private[kafka] class LogManager(val config: KafkaConfig,
   private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
   private val logFlushIntervalMap = config.flushIntervalMap
   private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
-  private val logRetentionSize = config.logRetentionSize
 
   /* Initialize a log for each subdirectory of the main log directory */
   private val logs = new Pool[String, Pool[Int, Log]]()
@@ -194,51 +193,6 @@ private[kafka] class LogManager(val config: KafkaConfig,
     log
   }
   
-  /* Attemps to delete all provided segments from a log and returns how many it was able to */
-  private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = {
-    var total = 0
-    for(segment <- segments) {
-      logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name)
-      Utils.swallow(logger.warn, segment.messageSet.close())
-      if(!segment.file.delete()) {
-        logger.warn("Delete failed.")
-      } else {
-        total += 1
-      }
-    }
-    total
-  }
-
-  /* Runs through the log removing segments older than a certain age */
-  private def cleanupExpiredSegments(log: Log): Int = {
-    val startMs = time.milliseconds
-    val topic = Utils.getTopicPartition(log.dir.getName)._1
-    val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
-    val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
-    val total = deleteSegments(log, toBeDeleted)
-    total
-  }
-
-  /**
-   *  Runs through the log removing segments until the size of the log
-   *  is at least logRetentionSize bytes in size
-   */
-  private def cleanupSegmentsToMaintainSize(log: Log): Int = {
-    if(logRetentionSize < 0 || log.size < logRetentionSize) return 0
-    var diff = log.size - logRetentionSize
-    def shouldDelete(segment: LogSegment) = {
-      if(diff - segment.size >= 0) {
-        diff -= segment.size
-        true
-      } else {
-        false
-      }
-    }
-    val toBeDeleted = log.markDeletedWhile( shouldDelete )
-    val total = deleteSegments(log, toBeDeleted)
-    total
-  }
-
   /**
    * Delete any eligible logs. Return the number of segments deleted.
    */
@@ -250,7 +204,19 @@ private[kafka] class LogManager(val config: KafkaConfig,
     while(iter.hasNext) {
       val log = iter.next
       logger.debug("Garbage collecting '" + log.name + "'")
-      total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
+      var logCleanupThresholdMS = this.logCleanupDefaultAgeMs
+      val topic = Utils.getTopicPartition(log.dir.getName)._1
+      if (logRetentionMSMap.contains(topic))
+        logCleanupThresholdMS = logRetentionMSMap(topic)
+      val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
+      for(segment <- toBeDeleted) {
+        logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name)
+        Utils.swallow(logger.warn, segment.messageSet.close())
+        if(!segment.file.delete())
+          logger.warn("Delete failed.")
+        else
+          total += 1
+      }
     }
     logger.debug("Log cleanup completed. " + total + " files deleted in " + 
                  (time.milliseconds - startMs) / 1000 + " seconds")
@@ -260,7 +226,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
    * Close all the logs
    */
   def close() {
-    logFlusherScheduler.shutdown
+    logFlusherScheduler.shutdown()
     val iter = getLogIterator
     while(iter.hasNext)
       iter.next.close()
@@ -317,7 +283,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
             case _ =>
           }
       }
-    }     
+    }
   }
 
 
diff --git core/src/main/scala/kafka/producer/Producer.scala core/src/main/scala/kafka/producer/Producer.scala
index 3b90644..2774961 100644
--- core/src/main/scala/kafka/producer/Producer.scala
+++ core/src/main/scala/kafka/producer/Producer.scala
@@ -37,6 +37,8 @@ class Producer[K,V](config: ProducerConfig,
   private val hasShutdown = new AtomicBoolean(false)
   if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerPartitionInfo))
     throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
+  if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerPartitionInfo))
+    logger.warn("Both zk.connect and broker.list provided (zk.connect takes precedence).")
   private val random = new java.util.Random
   // check if zookeeper based auto partition discovery is enabled
   private val zkEnabled = Utils.propertyExists(config.zkConnect)
diff --git core/src/main/scala/kafka/server/KafkaConfig.scala core/src/main/scala/kafka/server/KafkaConfig.scala
index 8577f42..08307d3 100644
--- core/src/main/scala/kafka/server/KafkaConfig.scala
+++ core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -64,9 +64,6 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) {
   /* the number of hours to keep a log file before deleting it */
   val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue))
   
-  /* the maximum size of the log before deleting it */
-  val logRetentionSize = Utils.getInt(props, "log.retention.size", -1)
-
   /* the number of hours to keep a log file before deleting it for some specific topic*/
   val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
 
diff --git core/src/main/scala/kafka/server/KafkaServer.scala core/src/main/scala/kafka/server/KafkaServer.scala
index c7c74ec..1f62272 100644
--- core/src/main/scala/kafka/server/KafkaServer.scala
+++ core/src/main/scala/kafka/server/KafkaServer.scala
@@ -96,7 +96,7 @@ class KafkaServer(val config: KafkaConfig) {
     if (canShutdown) {
       logger.info("Shutting down...")
       try {
-        scheduler.shutdown
+        scheduler.shutdown()
         if (socketServer != null)
           socketServer.shutdown()
         Utils.swallow(logger.warn, Utils.unregisterMBean(statsMBeanName))
diff --git core/src/main/scala/kafka/utils/KafkaScheduler.scala core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 397db95..f090fdb 100644
--- core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -36,12 +36,19 @@ class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon:
       t
     }
   })
-  
+  executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
+  executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+
   def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) =
     executor.scheduleAtFixedRate(Utils.loggedRunnable(fun), delayMs, periodMs, TimeUnit.MILLISECONDS)
 
-  def shutdown() = {
-    executor.shutdownNow
+  def shutdownNow() {
+    executor.shutdownNow()
+    logger.info("force shutdown scheduler " + baseThreadName)
+  }
+
+  def shutdown() {
+    executor.shutdown()
     logger.info("shutdown scheduler " + baseThreadName)
   }
 }
diff --git core/src/test/scala/unit/kafka/log/LogManagerTest.scala core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index e088c98..3cc83e3 100644
--- core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -59,7 +59,7 @@ class LogManagerTest extends JUnitSuite {
 
 
   @Test
-  def testCleanupExpiredSegments() {
+  def testCleanup() {
     val log = logManager.getOrCreateLog("cleanup", 0)
     var offset = 0L
     for(i <- 0 until 1000) {
@@ -87,54 +87,6 @@ class LogManagerTest extends JUnitSuite {
   }
 
   @Test
-  def testCleanupSegmentsToMaintainSize() {
-    val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
-    val retentionHours = 1
-    val retentionMs = 1000 * 60 * 60 * retentionHours
-    val props = TestUtils.createBrokerConfig(0, -1)
-    logManager.close
-    Thread.sleep(100)
-    config = new KafkaConfig(props) {
-      override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
-      override val enableZookeeper = false
-      override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Int] // keep exactly 6 segments + 1 roll over
-      override val logRetentionHours = retentionHours
-    }
-    logManager = new LogManager(config, null, time, -1, retentionMs, false)
-    logManager.startup
-
-    // create a log
-    val log = logManager.getOrCreateLog("cleanup", 0)
-    var offset = 0L
-
-    // add a bunch of messages that should be larger than the retentionSize
-    for(i <- 0 until 1000) {
-      val set = TestUtils.singleMessageSet("test".getBytes())
-      log.append(set)
-      offset += set.sizeInBytes
-    }
-    // flush to make sure it's written to disk, then sleep to confirm
-    log.flush
-    Thread.sleep(2000)
-
-    // should be exactly 100 full segments + 1 new empty one
-    assertEquals("There should be example 101 segments.", 100 + 1, log.numberOfSegments)
-
-    // this cleanup shouldn't find any expired segments but should delete some to reduce size
-    logManager.cleanupLogs()
-    assertEquals("Now there should be exactly 7 segments", 6 + 1, log.numberOfSegments)
-    assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes)
-    try {
-      log.read(0, 1024)
-      fail("Should get exception from fetching earlier.")
-    } catch {
-      case e: OffsetOutOfRangeException => "This is good."
-    }
-    // log should still be appendable
-    log.append(TestUtils.singleMessageSet("test".getBytes()))
-  }
-
-  @Test
   def testTimeBasedFlush() {
     val props = TestUtils.createBrokerConfig(0, -1)
     logManager.close
