diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 8ff4bd5..a167756 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -182,7 +182,7 @@ object AdminUtils extends Logging { private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) { try { val zkPath = ZkUtils.getTopicPath(topic) - val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2))) + val jsonPartitionData = ZkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2))) if (!update) { info("Topic creation " + jsonPartitionData.toString) diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 26beb96..9b3c6ae 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -89,13 +89,8 @@ object PreferredReplicaLeaderElectionCommand extends Logging { def writePreferredReplicaElectionData(zkClient: ZkClient, partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) { val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath - var partitionsData: mutable.ListBuffer[String] = ListBuffer[String]() - for (p <- partitionsUndergoingPreferredReplicaElection) { - partitionsData += Utils.mergeJsonFields(Utils.mapToJsonFields(Map("topic" -> p.topic), valueInQuotes = true) ++ - Utils.mapToJsonFields(Map("partition" -> p.partition.toString), valueInQuotes = false)) - } - val jsonPartitionsData = Utils.seqToJson(partitionsData, valueInQuotes = false) - val jsonData = Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonPartitionsData), valueInQuotes = false) + val partitionsList = partitionsUndergoingPreferredReplicaElection.map(e => Map("topic" -> e.topic, "partition" -> e.partition)) + val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList)) try { ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) info("Created preferred replica election path with %s".format(jsonData)) diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 981d2bb..3401afa 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -37,11 +37,7 @@ case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion) override def toString(): String = { - val jsonDataMap = new collection.mutable.HashMap[String, String] - jsonDataMap.put("leader", leader.toString) - jsonDataMap.put("leaderEpoch", leaderEpoch.toString) - jsonDataMap.put("ISR", isr.mkString(",")) - Utils.mapToJson(jsonDataMap, valueInQuotes = true) + Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" -> isr)) } } diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index a3eb53e..e332633 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -25,7 +25,7 @@ import kafka.common.KafkaException private[kafka] trait TopicCount { def getConsumerThreadIdsPerTopic: Map[String, Set[String]] - def dbString: String + def getTopicCountMap: Map[String, Int] def pattern: String protected def makeConsumerThreadIdsPerTopic(consumerIdString: String, @@ -111,24 +111,7 @@ private[kafka] class StaticTopicCount(val consumerIdString: String, } } - /** - * return json of - * { "topic1" : 4, - * "topic2" : 4 } - */ - def dbString = { - val builder = new StringBuilder - builder.append("{ ") - var i = 0 - for ( (topic, nConsumers) <- topicCountMap) { - if (i > 0) - builder.append(",") - builder.append("\"" + topic + "\": " + nConsumers) - i += 1 - } - builder.append(" }") - builder.toString() - } + def getTopicCountMap = topicCountMap def pattern = TopicCount.staticPattern } @@ -142,7 +125,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient, makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*)) } - def dbString = "{ \"%s\" : %d }".format(topicFilter.regex, numStreams) + def getTopicCountMap = Map(topicFilter.regex -> numStreams) def pattern: String = { topicFilter match { diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index c0350cd..6d0cfa6 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -220,11 +220,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) { info("begin registering consumer " + consumerIdString + " in ZK") val timestamp = SystemTime.milliseconds.toString - val consumerRegistrationInfo = - Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false) - ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true)) + val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern, + "timestamp" -> timestamp)) - createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, (consumerZKString, consumer) => true, config.zkSessionTimeoutMs) + createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, + (consumerZKString, consumer) => true, config.zkSessionTimeoutMs) info("end registering consumer " + consumerIdString + " in ZK") } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 88792c2..4c319ab 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -722,7 +722,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) { try { val zkPath = ZkUtils.getTopicPath(topicAndPartition.topic) - val jsonPartitionMap = ZkUtils.replicaAssignmentZkdata(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2))) + val jsonPartitionMap = ZkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2))) ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap) debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap)) } catch { diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index e1f8b97..6c099da 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -123,8 +123,6 @@ class FileMessageSet private[kafka](@volatile var file: File, if(offset >= targetOffset) return OffsetPosition(offset, position) val messageSize = buffer.getInt() - if(messageSize < Message.MessageOverhead) - throw new IllegalStateException("Invalid message size: " + messageSize) position += MessageSet.LogOverhead + messageSize } null diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 1883a53..9205128 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -155,19 +155,26 @@ class Log(val dir: File, activeSegment.index.resize(config.maxIndexSize) } - // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment - for (s <- logSegments) - s.index.sanityCheck() + // sanity check the index file of every segment, if it's empty or its last offset is greater than its base offset. + for (s <- logSegments) { + require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset, + "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" + .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset)) + } } private def recoverLog() { - // if we have the clean shutdown marker, skip recovery - if(hasCleanShutdownFile) { - this.recoveryPoint = activeSegment.nextOffset + val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L} + val needsRecovery = !(new File(dir.getParentFile, CleanShutdownFile)).exists() + if(!needsRecovery) { + this.recoveryPoint = lastOffset + return + } + if(lastOffset <= this.recoveryPoint) { + info("Log '%s' is fully intact, skipping recovery".format(name)) + this.recoveryPoint = lastOffset return } - - // okay we need to actually recover this log val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator while(unflushed.hasNext) { val curr = unflushed.next @@ -189,11 +196,6 @@ class Log(val dir: File, } } } - - /** - * Check if we have the "clean shutdown" file - */ - private def hasCleanShutdownFile() = new File(dir.getParentFile, CleanShutdownFile).exists() /** * The number of segments in the log. diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 81be88a..390b759 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -175,8 +175,6 @@ class LogManager(val logDirs: Array[File], allLogs.foreach(_.close()) // update the last flush point checkpointRecoveryPointOffsets() - // mark that the shutdown was clean by creating the clean shutdown marker file - logDirs.foreach(dir => Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())) } finally { // regardless of whether the close succeeded, we need to unlock the data directories dirLocks.foreach(_.destroy()) diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 96571b3..2f4e303 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -69,8 +69,12 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi raf.setLength(roundToExactMultiple(maxIndexSize, 8)) } + val len = raf.length() + if(len < 0 || len % 8 != 0) + throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + + " bytes which is not positive or not a multiple of 8.") + /* memory-map the file */ - val len = raf.length() val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len) /* set the position in the index for the next entry */ @@ -95,20 +99,22 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi var maxEntries = mmap.limit / 8 /* the last offset in the index */ - var lastOffset = readLastEntry.offset + var lastOffset = readLastOffset() debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position)) /** - * The last entry in the index + * The last offset written to the index */ - def readLastEntry(): OffsetPosition = { + private def readLastOffset(): Long = { inLock(lock) { - size.get match { - case 0 => OffsetPosition(baseOffset, 0) - case s => OffsetPosition(baseOffset + relativeOffset(this.mmap, s-1), physical(this.mmap, s-1)) - } + val offset = + size.get match { + case 0 => 0 + case s => relativeOffset(this.mmap, s-1) + } + baseOffset + offset } } @@ -173,7 +179,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi /* return the nth offset relative to the base offset */ private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8) - /* return the nth physical position */ + /* return the nth physical offset */ private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4) /** @@ -252,7 +258,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi inLock(lock) { this.size.set(entries) mmap.position(this.size.get * 8) - this.lastOffset = readLastEntry.offset + this.lastOffset = readLastOffset } } @@ -345,20 +351,6 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi } /** - * Do a basic sanity check on this index to detect obvious problems - * @throw IllegalArgumentException if any problems are found - */ - def sanityCheck() { - require(entries == 0 || lastOffset > baseOffset, - "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" - .format(file.getAbsolutePath, lastOffset, baseOffset)) - val len = file.length() - require(len % 8 == 0, - "Index file " + file.getAbsolutePath + " is corrupt, found " + len + - " bytes which is not positive or not a multiple of 8.") - } - - /** * Round a number to the greatest exact multiple of the given factor less than the given number. * E.g. roundToExactMultiple(67, 8) == 64 */ diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index 33b7360..cc6f1eb 100644 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -17,7 +17,7 @@ package kafka.server import kafka.utils.ZkUtils._ -import kafka.utils.{Utils, SystemTime, Logging} +import kafka.utils.{Json, Utils, SystemTime, Logging} import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.I0Itec.zkclient.IZkDataListener import kafka.controller.ControllerContext @@ -49,9 +49,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: def elect: Boolean = { val timestamp = SystemTime.milliseconds.toString - val electString = - Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false) - ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true)) + val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp)) try { createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId, diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index c9ca95f..a89b046 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -446,65 +446,6 @@ object Utils extends Logging { def nullOrEmpty(s: String): Boolean = s == null || s.equals("") /** - * Merge JSON fields of the format "key" : value/object/array. - */ - def mergeJsonFields(objects: Seq[String]): String = { - val builder = new StringBuilder - builder.append("{ ") - builder.append(objects.sorted.map(_.trim).mkString(", ")) - builder.append(" }") - builder.toString - } - - /** - * Format a Map[String, String] as JSON object. - */ - def mapToJsonFields(jsonDataMap: Map[String, String], valueInQuotes: Boolean): Seq[String] = { - val jsonFields: mutable.ListBuffer[String] = ListBuffer() - val builder = new StringBuilder - for ((key, value) <- jsonDataMap.toList.sorted) { - builder.append("\"" + key + "\":") - if (valueInQuotes) - builder.append("\"" + value + "\"") - else - builder.append(value) - jsonFields += builder.toString - builder.clear() - } - jsonFields - } - - /** - * Format a Map[String, String] as JSON object. - */ - def mapToJson(jsonDataMap: Map[String, String], valueInQuotes: Boolean): String = { - mergeJsonFields(mapToJsonFields(jsonDataMap, valueInQuotes)) - } - - /** - * Format a Seq[String] as JSON array. - */ - def seqToJson(jsonData: Seq[String], valueInQuotes: Boolean): String = { - val builder = new StringBuilder - builder.append("[ ") - if (valueInQuotes) - builder.append(jsonData.map("\"" + _ + "\"").mkString(", ")) - else - builder.append(jsonData.mkString(", ")) - builder.append(" ]") - builder.toString - } - - /** - * Format a Map[String, Seq[Int]] as JSON - */ - - def mapWithSeqValuesToJson(jsonDataMap: Map[String, Seq[Int]]): String = { - mergeJsonFields(mapToJsonFields(jsonDataMap.map(e => (e._1 -> seqToJson(e._2.map(_.toString), valueInQuotes = false))), - valueInQuotes = false)) - } - - /** * Create a circular (looping) iterator over a collection. * @param coll An iterable over the underlying collection. * @return A circular iterator over the collection. diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 856d136..73902b2 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -32,10 +32,11 @@ import kafka.common.{KafkaException, NoEpochForPartitionException} import kafka.controller.ReassignedPartitionsContext import kafka.controller.PartitionAndReplica import kafka.controller.KafkaController -import scala.Some +import scala.{collection, Some} import kafka.controller.LeaderIsrAndControllerEpoch import kafka.common.TopicAndPartition import kafka.utils.Utils.inLock +import scala.collection object ZkUtils extends Logging { val ConsumersPath = "/consumers" @@ -192,11 +193,8 @@ object ZkUtils extends Logging { def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id - val timestamp = "\"" + SystemTime.milliseconds.toString + "\"" - val brokerInfo = - Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++ - Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "timestamp" -> timestamp), - valueInQuotes = false)) + val timestamp = SystemTime.milliseconds.toString + val brokerInfo = Json.encode(Map("version" -> 1, "host" -> host, "port" -> port, "jmx_port" -> jmxPort, "timestamp" -> timestamp)) val expectedBroker = new Broker(id, host, port) try { @@ -219,18 +217,17 @@ object ZkUtils extends Logging { topicDirs.consumerOwnerDir + "/" + partition } + def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = { - val isrInfo = Utils.seqToJson(leaderAndIsr.isr.map(_.toString), valueInQuotes = false) - Utils.mapToJson(Map("version" -> 1.toString, "leader" -> leaderAndIsr.leader.toString, "leader_epoch" -> leaderAndIsr.leaderEpoch.toString, - "controller_epoch" -> controllerEpoch.toString, "isr" -> isrInfo), valueInQuotes = false) + Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch, + "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr)) } /** * Get JSON partition to replica map from zookeeper. */ - def replicaAssignmentZkdata(map: Map[String, Seq[Int]]): String = { - val jsonReplicaAssignmentMap = Utils.mapWithSeqValuesToJson(map) - Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonReplicaAssignmentMap), valueInQuotes = false) + def replicaAssignmentZkData(map: Map[String, Seq[Int]]): String = { + Json.encode(Map("version" -> 1, "partitions" -> map)) } /** @@ -656,16 +653,8 @@ object ZkUtils extends Logging { } def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = { - var jsonPartitionsData: mutable.ListBuffer[String] = ListBuffer[String]() - for (p <- partitionsToBeReassigned) { - val jsonReplicasData = Utils.seqToJson(p._2.map(_.toString), valueInQuotes = false) - val jsonTopicData = Utils.mapToJsonFields(Map("topic" -> p._1.topic), valueInQuotes = true) - val jsonPartitionData = Utils.mapToJsonFields(Map("partition" -> p._1.partition.toString, "replicas" -> jsonReplicasData), - valueInQuotes = false) - jsonPartitionsData += Utils.mergeJsonFields(jsonTopicData ++ jsonPartitionData) - } - Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> Utils.seqToJson(jsonPartitionsData.toSeq, valueInQuotes = false)), - valueInQuotes = false) + Json.encode(Map("version" -> 1, "partitions" -> partitionsToBeReassigned.map(e => Map("topic" -> e._1.topic, "partition" -> e._1.partition, + "replicas" -> e._2)))) } def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) { diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 6b76037..5f2c2e8 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -212,14 +212,15 @@ class LogSegmentTest extends JUnit3Suite { */ @Test def testRecoveryWithCorruptMessage() { + val rand = new Random(1) val messagesAppended = 20 for(iteration <- 0 until 10) { val seg = createSegment(0) for(i <- 0 until messagesAppended) seg.append(i, messages(i, i.toString)) - val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended) + val offsetToBeginCorruption = rand.nextInt(messagesAppended) // start corrupting somewhere in the middle of the chosen record all the way to the end - val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + TestUtils.random.nextInt(15) + val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + rand.nextInt(15) TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position) seg.recover(64*1024) assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1da1393..1571f1e 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -592,29 +592,29 @@ class LogTest extends JUnitSuite { val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000) val set = TestUtils.singleMessageSet("test".getBytes()) val recoveryPoint = 50L - for(iteration <- 0 until 50) { + for(iteration <- 0 until 10) { // create a log and write some messages to it - logDir.mkdirs() var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) - val numMessages = 50 + TestUtils.random.nextInt(50) - for(i <- 0 until numMessages) + for(i <- 0 until 100) log.append(set) - val messages = log.logSegments.flatMap(_.log.iterator.toList) + val seg = log.logSegments(0, recoveryPoint).last + val index = seg.index + val messages = seg.log + val filePosition = messages.searchFor(recoveryPoint, 0).position + val indexPosition = index.lookup(recoveryPoint).position log.close() - // corrupt index and log by appending random bytes - TestUtils.appendNonsenseToFile(log.activeSegment.index.file, TestUtils.random.nextInt(1024) + 1) - TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1) + // corrupt file + TestUtils.writeNonsenseToFile(index.file, indexPosition, index.file.length.toInt - indexPosition) + TestUtils.writeNonsenseToFile(messages.file, filePosition, messages.file.length().toInt - filePosition) // attempt recovery log = new Log(logDir, config, recoveryPoint, time.scheduler, time) - assertEquals(numMessages, log.logEndOffset) - assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList)) - Utils.rm(logDir) + assertEquals(recoveryPoint, log.logEndOffset) } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index d88b6c3..777b315 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -518,15 +518,9 @@ object TestUtils extends Logging { def writeNonsenseToFile(fileName: File, position: Long, size: Int) { val file = new RandomAccessFile(fileName, "rw") file.seek(position) + val rand = new Random for(i <- 0 until size) - file.writeByte(random.nextInt(255)) - file.close() - } - - def appendNonsenseToFile(fileName: File, size: Int) { - val file = new FileOutputStream(fileName, true) - for(i <- 0 until size) - file.write(random.nextInt(255)) + file.writeByte(rand.nextInt(255)) file.close() }