Index: core/src/main/scala/kafka/cluster/Partition.scala =================================================================== --- core/src/main/scala/kafka/cluster/Partition.scala (revision 1394632) +++ core/src/main/scala/kafka/cluster/Partition.scala (working copy) @@ -168,6 +168,7 @@ replicaFetcherManager.removeFetcher(topic, partitionId) // make sure local replica exists val localReplica = getOrCreateReplica() + info("Truncating log of topic %s partition %d to %d".format(topic, partitionId, localReplica.highWatermark)) localReplica.log.get.truncateTo(localReplica.highWatermark) inSyncReplicas = Set.empty[Replica] leaderEpoch = leaderAndISR.leaderEpoch Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1394632) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -91,7 +91,7 @@ /** * A segment file in the log directory. Each log segment consists of an open message set, a start offset and a size */ -class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long, time: Time) extends Range { +class LogSegment(val file: File, var messageSet: FileMessageSet, val start: Long, time: Time) extends Range { var firstAppendTime: Option[Long] = None @volatile var deleted = false /* Return the size in bytes of this log segment */ @@ -119,6 +119,10 @@ * @param offset Absolute offset for this partition */ def truncateTo(offset: Long) = { + if (!messageSet.mutable) { + messageSet.close() + messageSet = new FileMessageSet(file, true) + } messageSet.truncateTo(offset - start) } } @@ -442,7 +446,7 @@ def deleteSegments(segments: Seq[LogSegment]): Int = { var total = 0 for(segment <- segments) { - info("Deleting log segment " + segment.file.getName() + " from " + name) + info("Deleting log segment %s from %s of size %d".format(segment.file.getName(), name, segment.size)) swallow(segment.messageSet.close()) if(!segment.file.delete()) { warn("Delete failed.") @@ -474,8 +478,8 @@ case Some(segment) => val truncatedSegmentIndex = segments.view.indexOf(segment) segments.truncLast(truncatedSegmentIndex) + info("Truncated log segment %s to target offset %d".format(segment.file.getAbsolutePath, targetOffset)) segment.truncateTo(targetOffset) - info("Truncated log segment %s to target offset %d".format(segment.file.getAbsolutePath, targetOffset)) case None => if(targetOffset > segments.view.last.absoluteEndOffset) error("Target offset %d cannot be greater than the last message offset %d in the log %s". Index: core/src/main/scala/kafka/controller/KafkaController.scala =================================================================== --- core/src/main/scala/kafka/controller/KafkaController.scala (revision 1394632) +++ core/src/main/scala/kafka/controller/KafkaController.scala (working copy) @@ -147,8 +147,20 @@ info("New partition creation callback for %s".format(newPartitions.mkString(","))) partitionStateMachine.handleStateChanges(newPartitions, NewPartition) partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition) + val replicas = getAllReplicasForPartition(newPartitions) + replicas.foreach { + case(topic, partitionId, replicaId) => + replicaStateMachine.replicaState.put((topic, partitionId, replicaId), OnlineReplica) + } } + private def getAllReplicasForPartition(partitions: Seq[(String, Int)]): Seq[Tuple3[String, Int, Int]] = { + partitions.map { p => + val replicas = controllerContext.partitionReplicaAssignment(p) + replicas.map(r => (p._1, p._2, r)) + }.flatten + } + /* TODO: kafka-330 This API is unused until we introduce the delete topic functionality. remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/ def onTopicDeletion(topics: Set[String], replicaAssignment: mutable.Map[(String, Int), Seq[Int]]) { Index: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala =================================================================== --- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (revision 1394632) +++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (working copy) @@ -100,7 +100,7 @@ * @param replicaId The replica for which the state transition is invoked * @param targetState The end state that the replica should be moved to */ - private def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) { + def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) { try { targetState match { case OnlineReplica => Index: core/src/main/scala/kafka/server/ReplicaManager.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaManager.scala (revision 1394632) +++ core/src/main/scala/kafka/server/ReplicaManager.scala (working copy) @@ -42,7 +42,6 @@ private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir) - info("Created highwatermark file %s".format(highWatermarkCheckpoint.name)) newGauge( "LeaderCount",