Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1384073)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -89,7 +89,7 @@
 
 
 /**
- * A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size 
+ * A segment file in the log directory. Each log segment consists of an open message set, a start offset and a size
  */
 class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long, time: Time) extends Range {
   var firstAppendTime: Option[Long] = None
@@ -428,12 +428,6 @@
     }
   }
 
-
-  def deleteWholeLog():Unit = {
-    deleteSegments(segments.contents.get())
-    Utils.rm(dir)
-  }
-
   /* Attempts to delete all provided segments from a log and returns how many it was able to */
   def deleteSegments(segments: Seq[LogSegment]): Int = {
     var total = 0
@@ -462,14 +456,25 @@
         info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset))
       case None =>
         if(targetOffset > segments.view.last.absoluteEndOffset)
-         error("Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
+         error("Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".
+           format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
     }
 
     val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
-    if(segmentsToBeDeleted.size < segments.view.size) {
-      val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
-      if(numSegmentsDeleted != segmentsToBeDeleted.size)
-        error("Failed to delete some segments during log recovery")
+    val viewSize = segments.view.size
+    val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
+
+    if(numSegmentsDeleted != segmentsToBeDeleted.size)
+      error("Failed to delete some segments during log recovery")
+
+    if (numSegmentsDeleted == viewSize) {
+      val newFile = new File(dir, nameFromOffset(targetOffset))
+      if (newFile.exists) {
+        warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it first")
+        newFile.delete()
+      }
+      debug("Rolling log '" + name + "' to " + newFile.getName())
+      segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), targetOffset, time))
     }
   }
 
