From 80caff3cadd2c495fc27fb475295faeccfd64498 Mon Sep 17 00:00:00 2001 From: mgharat Date: Tue, 17 Mar 2015 13:55:10 -0700 Subject: [PATCH] The JVM should stop if the underlying file system goes in to Read only mode --- .../main/scala/kafka/server/OffsetCheckpoint.scala | 64 ++++++++++++---------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala index 8c5b054..3b3dc73 100644 --- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala +++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala @@ -31,39 +31,45 @@ class OffsetCheckpoint(val file: File) extends Logging { def write(offsets: Map[TopicAndPartition, Long]) { lock synchronized { - // write to temp file and then swap with the existing file - val temp = new File(file.getAbsolutePath + ".tmp") - - val fileOutputStream = new FileOutputStream(temp) - val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream)) try { - // write the current version - writer.write(0.toString) - writer.newLine() - - // write the number of entries - writer.write(offsets.size.toString) - writer.newLine() + // write to temp file and then swap with the existing file + val temp = new File(file.getAbsolutePath + ".tmp") + + val fileOutputStream = new FileOutputStream(temp) + val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream)) + try { + // write the current version + writer.write(0.toString) + writer.newLine() - // write the entries - offsets.foreach { case (topicPart, offset) => - writer.write("%s %d %d".format(topicPart.topic, topicPart.partition, offset)) + // write the number of entries + writer.write(offsets.size.toString) writer.newLine() + + // write the entries + offsets.foreach { case (topicPart, offset) => + writer.write("%s %d %d".format(topicPart.topic, topicPart.partition, offset)) + writer.newLine() + } + + // flush the buffer and then fsync the underlying file + writer.flush() + fileOutputStream.getFD().sync() + } finally { + writer.close() } - - // flush the buffer and then fsync the underlying file - writer.flush() - fileOutputStream.getFD().sync() - } finally { - writer.close() - } - - // swap new offset checkpoint file with previous one - if(!temp.renameTo(file)) { - // renameTo() fails on Windows if the destination file exists. - file.delete() - if(!temp.renameTo(file)) - throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath, file.getAbsolutePath)) + + // swap new offset checkpoint file with previous one + if(!temp.renameTo(file)) { + // renameTo() fails on Windows if the destination file exists. + file.delete() + if(!temp.renameTo(file)) + throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath, file.getAbsolutePath)) + } + } catch { + case e: FileNotFoundException => + fatal("Halting writes to offset checkpoint file because the underlying file system is inaccessible : ", e) + Runtime.getRuntime.halt(1) } } } -- 1.9.3 (Apple Git-50)