diff --git a/kafka/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala b/kafka/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala index c1527f8..5459ccd 100644 --- a/kafka/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala +++ b/kafka/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala @@ -85,33 +85,42 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging { 0L case _ => val hwFileReader = new BufferedReader(new FileReader(hwFile)) - val version = hwFileReader.readLine().toShort - version match { - case HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion => - val numberOfHighWatermarks = hwFileReader.readLine().toInt - val partitionHighWatermarks = - for(i <- 0 until numberOfHighWatermarks) yield { - val nextHwEntry = hwFileReader.readLine() - val partitionHwInfo = nextHwEntry.split(" ") - val topic = partitionHwInfo(0) - val partitionId = partitionHwInfo(1).toInt - val highWatermark = partitionHwInfo(2).toLong - (TopicAndPartition(topic, partitionId) -> highWatermark) + val line = hwFileReader.readLine() + try { + val version = line.toShort + version match { + case HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion => + val numberOfHighWatermarks = hwFileReader.readLine().toInt + val partitionHighWatermarks = + for(i <- 0 until numberOfHighWatermarks) yield { + val nextHwEntry = hwFileReader.readLine() + val partitionHwInfo = nextHwEntry.split(" ") + val topic = partitionHwInfo(0) + val partitionId = partitionHwInfo(1).toInt + val highWatermark = partitionHwInfo(2).toLong + (TopicAndPartition(topic, partitionId) -> highWatermark) + } + hwFileReader.close() + val hwOpt = partitionHighWatermarks.toMap.get(TopicAndPartition(topic, partition)) + hwOpt match { + case Some(hw) => + debug("Read hw %d for partition [%s,%d] from highwatermark checkpoint file".format(hw, topic, partition)) + hw + case None => + warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) + + "partition %d. Returning 0 as the highwatermark".format(partition)) + 0L } - hwFileReader.close() - val hwOpt = partitionHighWatermarks.toMap.get(TopicAndPartition(topic, partition)) - hwOpt match { - case Some(hw) => - debug("Read hw %d for partition [%s,%d] from highwatermark checkpoint file".format(hw, topic, partition)) - hw - case None => - warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) + - "partition %d. Returning 0 as the highwatermark".format(partition)) - 0L - } - case _ => fatal("Unrecognized version of the highwatermark checkpoint file " + version) - System.exit(1) - -1L + case _ => fatal("Unrecognized version of the highwatermark checkpoint file " + version) + System.exit(1) + -1L + } + } catch { + case ex: java.lang.NumberFormatException => { + fatal("NumberFormatException: isr input string is corrupted " + line) + System.exit(1) + -1L + } } } }finally {