diff --git a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala index b726a7e..cf63df5 100644 --- a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala +++ b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala @@ -77,8 +77,8 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging { try { hwFile.length() match { case 0 => - warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) + - "partition %d. Returning 0 as the highwatermark".format(partition)) + warn("No highwatermark file is found. Returning 0 as the highwatermark for topic %s partition %d." + .format(topic, partition)) 0L case _ => val hwFileReader = new BufferedReader(new FileReader(hwFile)) @@ -90,11 +90,9 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging { for(i <- 0 until numberOfHighWatermarks) yield { val nextHwEntry = hwFileReader.readLine() val partitionHwInfo = nextHwEntry.split(" ") - val highwaterMark = partitionHwInfo.last.toLong - val partitionId = partitionHwInfo.takeRight(2).head - // find the index of partition - val partitionIndex = nextHwEntry.indexOf(partitionId) - val topic = nextHwEntry.substring(0, partitionIndex-1) + val topic = partitionHwInfo(0) + val partitionId = partitionHwInfo(1) + val highwaterMark = partitionHwInfo(2).toLong (TopicAndPartition(topic, partitionId.toInt) -> highwaterMark) } hwFileReader.close()