From 8f71f35b5e03a61aaa14f989183f758eb8e7bec2 Mon Sep 17 00:00:00 2001 From: mgharat Date: Mon, 23 Mar 2015 17:46:59 -0700 Subject: [PATCH] Check for corrupt index files and delete them. They can be rebuilt. --- core/src/main/scala/kafka/log/Log.scala | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 06b8ecc..871a662 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -153,6 +153,19 @@ class Log(val dir: File, } } + // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment + // delete any corrupt index file. It will be rebuilt + for (s <- logSegments) { + try { + s.index.sanityCheck() + } + catch { + case e: IllegalArgumentException => + warn("Found a corrupt index file %s. Deleting it, will be rebuilt".format(s.index.file.getAbsolutePath)) + s.index.delete() + } + } + // now do a second pass and load all the .log and .index files for(file <- dir.listFiles if file.isFile) { val filename = file.getName @@ -194,10 +207,6 @@ class Log(val dir: File, // reset the index size of the currently active log segment to allow more entries activeSegment.index.resize(config.maxIndexSize) } - - // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment - for (s <- logSegments) - s.index.sanityCheck() } private def updateLogEndOffset(messageOffset: Long) { -- 1.9.3 (Apple Git-50)