Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1296510) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -249,10 +249,18 @@ val deletable = view.takeWhile(predicate) for(seg <- deletable) seg.deleted = true - val numToDelete = deletable.size + var numToDelete = deletable.size // if we are deleting everything, create a new empty segment - if(numToDelete == view.size) - roll() + if(numToDelete == view.size) { + if (view(numToDelete - 1).size > 0) + roll() + else { + // If the last segment to be deleted is empty and we roll the log, the new segment will have the same + // file name. So simply reuse the last segment and reset the modified time. + view(numToDelete - 1).file.setLastModified(SystemTime.milliseconds) + numToDelete -=1 + } + } segments.trunc(numToDelete) } } @@ -290,9 +298,12 @@ */ def roll() { lock synchronized { - val last = segments.view.last val newOffset = nextAppendOffset val newFile = new File(dir, Log.nameFromOffset(newOffset)) + if (newFile.exists) { + warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it first") + newFile.delete() + } debug("Rolling log '" + name + "' to " + newFile.getName()) segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset)) }