Index: core/src/main/scala/kafka/log/OffsetIndex.scala
===================================================================
--- core/src/main/scala/kafka/log/OffsetIndex.scala	(revision 1403534)
+++ core/src/main/scala/kafka/log/OffsetIndex.scala	(working copy)
@@ -54,30 +54,20 @@
   /* the memory mapping */
   private var mmap: MappedByteBuffer = 
     {
-      val newlyCreated = file.createNewFile()
       val raf = new RandomAccessFile(file, "rw")
       try {
-        /* pre-allocate the file if necessary */
-        if(newlyCreated) {
-          if(maxIndexSize < 8)
-            throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
-          raf.setLength(roundToExactMultiple(maxIndexSize, 8))
-        }
-          
-        val len = raf.length()  
+        val len = raf.length()
         if(len < 0 || len % 8 != 0)
           throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + 
                                           " bytes which is not positive or not a multiple of 8.")
-          
+
+        // We always allocate up to the maxIndexSize disk space and memory for the offset index, whether it's
+        // loaded from the disk or a newly created one
+        raf.setLength(roundToExactMultiple(maxIndexSize, 8))
+
         /* memory-map the file */
-        val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
-          
-        /* set the position in the index for the next entry */
-        if(newlyCreated)
-          idx.position(0)
-        else
-          // if this is a pre-existing index, assume it is all valid and set position to last entry
-          idx.position(roundToExactMultiple(idx.limit, 8))
+        val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, maxIndexSize)
+        idx.position(roundToExactMultiple(len.toInt, 8))
         idx
       } finally {
         Utils.swallow(raf.close())
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1403534)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -447,7 +447,12 @@
       case Some(segment) => segment.index.trimToSize()
       case None => 
     }
-    val segment = new LogSegment(dir, 
+
+    val segmentsView = segments.view
+    if(segmentsView.length > 0 && segmentsView.last.index.file.getName.split("\\.")(0).toLong == newOffset)
+      throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d     while it already exsits".format(dir.getName, newOffset))
+
+    val segment = new LogSegment(dir,
                                  startOffset = newOffset,
                                  indexIntervalBytes = indexIntervalBytes, 
                                  maxIndexSize = maxIndexSize)
