Index: core/src/main/scala/kafka/utils/Utils.scala
===================================================================
--- core/src/main/scala/kafka/utils/Utils.scala	(revision 1241331)
+++ core/src/main/scala/kafka/utils/Utils.scala	(working copy)
@@ -34,7 +34,7 @@
  * Helper functions!
  */
 object Utils extends Logging {
-  
+
   /**
    * Wrap the given function in a java.lang.Runnable
    * @param fun A function
Index: core/src/main/scala/kafka/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducer.scala	(revision 1241331)
+++ core/src/main/scala/kafka/producer/SyncProducer.scala	(working copy)
@@ -36,10 +36,12 @@
  */
 @threadsafe
 class SyncProducer(val config: SyncProducerConfig) extends Logging {
-  
+
   private val MaxConnectBackoffMs = 60000
   private var channel : SocketChannel = null
   private var sentOnConnection = 0
+  private var lastConnectionTime = System.currentTimeMillis
+
   private val lock = new Object()
   @volatile
   private var shutdown: Boolean = false
@@ -77,7 +79,7 @@
    * Common functionality for the public send methods
    */
   private def send(send: BoundedByteBufferSend) {
-    lock synchronized {
+   lock synchronized {
       verifySendBuffer(send.buffer.slice)
       val startTime = SystemTime.nanoseconds
       getOrMakeConnection()
@@ -94,10 +96,12 @@
       }
       // TODO: do we still need this?
       sentOnConnection += 1
-      if(sentOnConnection >= config.reconnectInterval) {
+
+      if(sentOnConnection >= config.reconnectInterval || System.currentTimeMillis - lastConnectionTime >= config.reconnectTimeInterval) {
         disconnect()
         channel = connect()
         sentOnConnection = 0
+        lastConnectionTime = System.currentTimeMillis
       }
       val endTime = SystemTime.nanoseconds
       SyncProducerStats.recordProduceRequest(endTime - startTime)
@@ -153,7 +157,7 @@
       case e: Exception => error("Error on disconnect: ", e)
     }
   }
-    
+
   private def connect(): SocketChannel = {
     var connectBackoffMs = 1
     val beginTimeMs = SystemTime.milliseconds
Index: core/src/main/scala/kafka/producer/SyncProducerConfig.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducerConfig.scala	(revision 1241331)
+++ core/src/main/scala/kafka/producer/SyncProducerConfig.scala	(working copy)
@@ -40,5 +40,7 @@
 
   val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000)
 
+  var reconnectTimeInterval = Utils.getInt(props, "reconnect.timeInterval", 1000)
+
   val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)
 }
Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(revision 1241331)
+++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(working copy)
@@ -86,6 +86,7 @@
       def innerDone():Boolean = (innerIter==null || !innerIter.hasNext)
 
       def makeNextOuter: MessageAndOffset = {
+
         if (topIter.remaining < 4) {
           return allDone()
         }
