Index: core/src/main/scala/kafka/log/OffsetIndex.scala
===================================================================
--- core/src/main/scala/kafka/log/OffsetIndex.scala	(revision 1403534)
+++ core/src/main/scala/kafka/log/OffsetIndex.scala	(working copy)
@@ -49,7 +49,7 @@
  * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal 
  * storage format.
  */
-class OffsetIndex(val file: File, val baseOffset: Long, maxIndexSize: Int = -1) extends Logging {
+class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
   
   /* the memory mapping */
   private var mmap: MappedByteBuffer = 
@@ -85,7 +85,7 @@
     }
   
   /* the maximum number of entries this index can hold */
-  val maxEntries = mmap.limit / 8
+  def maxEntries = mmap.limit / 8
   
   /* the number of entries in the index */
   private var size = new AtomicInteger(mmap.position / 8)
@@ -227,14 +227,15 @@
    * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
    * the file.
    */
-  def trimToSize() {
+  def trimToValidSize() = resetSizeTo(entries * 8)
+
+  def resetSizeTo(newSize: Int) {
     this synchronized {
       flush()
       val raf = new RandomAccessFile(file, "rws")
       try {
-        val newLength = entries * 8
-        raf.setLength(newLength)
-        this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, newLength)
+        raf.setLength(newSize)
+        this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, newSize)
       } finally {
         Utils.swallow(raf.close())
       }
@@ -262,7 +263,7 @@
   
   /** Close the index */
   def close() {
-    trimToSize()
+    trimToValidSize()
   }
   
   /**
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1403534)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -174,6 +174,9 @@
         }
       })
 
+      // reset the index size of the last (current active) log segment to its maximum value
+      logSegments.get(logSegments.size() - 1).index.resetSizeTo(maxIndexSize)
+
       // run recovery on the last segment if necessary
       if(needsRecovery)
         recoverSegment(logSegments.get(logSegments.size - 1))
@@ -444,9 +447,14 @@
     }
     debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
     segments.view.lastOption match {
-      case Some(segment) => segment.index.trimToSize()
+      case Some(segment) => segment.index.trimToValidSize()
       case None => 
     }
+
+    val segmentsView = segments.view
+    if(segmentsView.size > 0 && segmentsView.last.index.file.getName.split("\\.")(0).toLong == newOffset)
+      throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exsits".format(dir.getName, newOffset))
+
     val segment = new LogSegment(dir, 
                                  startOffset = newOffset,
                                  indexIntervalBytes = indexIntervalBytes, 
Index: core/src/main/scala/kafka/log/LogSegment.scala
===================================================================
--- core/src/main/scala/kafka/log/LogSegment.scala	(revision 1403534)
+++ core/src/main/scala/kafka/log/LogSegment.scala	(working copy)
@@ -116,7 +116,9 @@
     val mapping = translateOffset(offset)
     if(mapping == null)
       return
-    index.truncateTo(offset)  
+    index.truncateTo(offset)
+    // after truncation, reset and allocate more space for the (new currently  active) index
+    index.resetSizeTo(index.maxIndexSize)
     messageSet.truncateTo(mapping.position)
   }
   
