Index: core/src/main/scala/kafka/log/OffsetIndex.scala
===================================================================
--- core/src/main/scala/kafka/log/OffsetIndex.scala	(revision 1403534)
+++ core/src/main/scala/kafka/log/OffsetIndex.scala	(working copy)
@@ -85,7 +85,7 @@
     }
   
   /* the maximum number of entries this index can hold */
-  val maxEntries = mmap.limit / 8
+  def maxEntries = mmap.limit / 8
   
   /* the number of entries in the index */
   private var size = new AtomicInteger(mmap.position / 8)
@@ -218,6 +218,7 @@
         else
           slot + 1
       this.size.set(newEntries)
+      trimOrReallocate(true)
       mmap.position(this.size.get * 8)
       this.lastOffset = readLastOffset
     }
@@ -227,12 +228,12 @@
    * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
    * the file.
    */
-  def trimToSize() {
+  def trimOrReallocate(isReallocate: Boolean = false) {
     this synchronized {
       flush()
       val raf = new RandomAccessFile(file, "rws")
       try {
-        val newLength = entries * 8
+        val newLength = if(isReallocate) maxIndexSize else entries * 8
         raf.setLength(newLength)
         this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, newLength)
       } finally {
@@ -262,7 +263,7 @@
   
   /** Close the index */
   def close() {
-    trimToSize()
+    trimOrReallocate()
   }
   
   /**
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1403534)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -444,9 +444,14 @@
     }
     debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
     segments.view.lastOption match {
-      case Some(segment) => segment.index.trimToSize()
+      case Some(segment) => segment.index.trimOrReallocate()
       case None => 
     }
+    segments.view.foreach(s => {
+      if(s.index.file.getName.split("\\.")(0).toLong == newOffset)
+        throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exsits".format(dir.getName, newOffset))
+    })
+
     val segment = new LogSegment(dir, 
                                  startOffset = newOffset,
                                  indexIntervalBytes = indexIntervalBytes, 
