diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index decba0e..e055d67 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -80,4 +80,4 @@ else JAVA="$JAVA_HOME/bin/java" fi -$JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@ +$JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH "$@" diff --git a/config/log4j.properties b/config/log4j.properties index 5692da0..b76bc94 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -36,6 +36,12 @@ log4j.appender.requestAppender.File=kafka-request.log log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n +log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.controllerAppender.File=controller.log +log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + # Turn on all our debugging info #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender @@ -53,5 +59,10 @@ log4j.additivity.kafka.network.RequestChannel$=false log4j.logger.kafka.request.logger=TRACE, requestAppender log4j.additivity.kafka.request.logger=false -log4j.logger.kafka.controller=TRACE, stateChangeAppender +log4j.logger.kafka.controller=TRACE, controllerAppender log4j.additivity.kafka.controller=false + +log4j.logger.state.change.logger=TRACE, stateChangeAppender +log4j.additivity.state.change.logger=false + + diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index dc4ed8e..a807c1f 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -59,13 +59,13 @@ object FetchRequest { } case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion, - correlationId: Int = FetchRequest.DefaultCorrelationId, + override val correlationId: Int = FetchRequest.DefaultCorrelationId, clientId: String = ConsumerConfig.DefaultClientId, replicaId: Int = Request.OrdinaryConsumerId, maxWait: Int = FetchRequest.DefaultMaxWait, minBytes: Int = FetchRequest.DefaultMinBytes, requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) - extends RequestOrResponse(Some(RequestKeys.FetchKey)) { + extends RequestOrResponse(Some(RequestKeys.FetchKey), correlationId) { /** * Partitions the request info into a map of maps (one for each topic). diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index d146b14..b40522d 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -93,6 +93,7 @@ object LeaderAndIsrRequest { val correlationId = buffer.getInt val clientId = readShortString(buffer) val ackTimeoutMs = buffer.getInt + val controllerId = buffer.getInt val controllerEpoch = buffer.getInt val partitionStateInfosCount = buffer.getInt val partitionStateInfos = new collection.mutable.HashMap[(String, Int), PartitionStateInfo] @@ -110,23 +111,24 @@ object LeaderAndIsrRequest { for (i <- 0 until leadersCount) leaders += Broker.readFrom(buffer) - new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch) + new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, controllerId, controllerEpoch, partitionStateInfos.toMap, leaders) } } case class LeaderAndIsrRequest (versionId: Short, - correlationId: Int, + override val correlationId: Int, clientId: String, ackTimeoutMs: Int, + controllerId: Int, + controllerEpoch: Int, partitionStateInfos: Map[(String, Int), PartitionStateInfo], - leaders: Set[Broker], - controllerEpoch: Int) - extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) { + leaders: Set[Broker]) + extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) { - def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], + def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerId: Int, controllerEpoch: Int, correlationId: Int) = { this(LeaderAndIsrRequest.CurrentVersion, correlationId, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, - partitionStateInfos, liveBrokers, controllerEpoch) + controllerId, controllerEpoch, partitionStateInfos, liveBrokers) } def writeTo(buffer: ByteBuffer) { @@ -134,6 +136,7 @@ case class LeaderAndIsrRequest (versionId: Short, buffer.putInt(correlationId) writeShortString(buffer, clientId) buffer.putInt(ackTimeoutMs) + buffer.putInt(controllerId) buffer.putInt(controllerEpoch) buffer.putInt(partitionStateInfos.size) for((key, value) <- partitionStateInfos){ @@ -151,6 +154,7 @@ case class LeaderAndIsrRequest (versionId: Short, 4 /* correlation id */ + (2 + clientId.length) /* client id */ + 4 /* ack timeout */ + + 4 /* controller id */ + 4 /* controller epoch */ + 4 /* number of partitions */ for((key, value) <- partitionStateInfos) @@ -165,10 +169,11 @@ case class LeaderAndIsrRequest (versionId: Short, val leaderAndIsrRequest = new StringBuilder leaderAndIsrRequest.append("Name: " + this.getClass.getSimpleName) leaderAndIsrRequest.append("; Version: " + versionId) + leaderAndIsrRequest.append("; Controller: " + controllerId) + leaderAndIsrRequest.append("; ControllerEpoch: " + controllerEpoch) leaderAndIsrRequest.append("; CorrelationId: " + correlationId) leaderAndIsrRequest.append("; ClientId: " + clientId) leaderAndIsrRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms") - leaderAndIsrRequest.append("; ControllerEpoch: " + controllerEpoch) leaderAndIsrRequest.append("; PartitionStateInfo: " + partitionStateInfos.mkString(",")) leaderAndIsrRequest.append("; Leaders: " + leaders.mkString(",")) leaderAndIsrRequest.toString() diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala index dbd85d0..b4cfae8 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala @@ -41,10 +41,10 @@ object LeaderAndIsrResponse { } -case class LeaderAndIsrResponse(correlationId: Int, +case class LeaderAndIsrResponse(override val correlationId: Int, responseMap: Map[(String, Int), Short], errorCode: Short = ErrorMapping.NoError) - extends RequestOrResponse { + extends RequestOrResponse(correlationId = correlationId) { def sizeInBytes(): Int ={ var size = 4 /* correlation id */ + diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 6360a98..32ebfd4 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -57,10 +57,10 @@ case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int) case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], versionId: Short = OffsetRequest.CurrentVersion, - correlationId: Int = 0, + override val correlationId: Int = 0, clientId: String = OffsetRequest.DefaultClientId, replicaId: Int = Request.OrdinaryConsumerId) - extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) { + extends RequestOrResponse(Some(RequestKeys.OffsetsKey), correlationId) { def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], correlationId: Int, replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, correlationId, OffsetRequest.DefaultClientId, replicaId) diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala index 264e200..08dc3cd 100644 --- a/core/src/main/scala/kafka/api/OffsetResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetResponse.scala @@ -47,9 +47,9 @@ object OffsetResponse { case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) -case class OffsetResponse(correlationId: Int, +case class OffsetResponse(override val correlationId: Int, partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse]) - extends RequestOrResponse { + extends RequestOrResponse(correlationId = correlationId) { lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic) diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 916fb59..1e05d7e 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -54,12 +54,12 @@ object ProducerRequest { } case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, - correlationId: Int, + override val correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) - extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { + extends RequestOrResponse(Some(RequestKeys.ProduceKey), correlationId) { /** * Partitions the data into a map of maps (one for each topic). diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 5bff709..d59c5bb 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -43,8 +43,9 @@ object ProducerResponse { case class ProducerResponseStatus(error: Short, offset: Long) -case class ProducerResponse(correlationId: Int, - status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse { +case class ProducerResponse(override val correlationId: Int, + status: Map[TopicAndPartition, ProducerResponseStatus]) + extends RequestOrResponse(correlationId = correlationId) { /** * Partitions the status map into a map of maps (one for each topic). diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala index 3175e1c..b62330b 100644 --- a/core/src/main/scala/kafka/api/RequestOrResponse.scala +++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala @@ -27,7 +27,7 @@ object Request { } -private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) extends Logging{ +private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None, val correlationId: Int) extends Logging{ def sizeInBytes: Int diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala index be3c7be..5107488 100644 --- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -53,13 +53,13 @@ object StopReplicaRequest extends Logging { } case class StopReplicaRequest(versionId: Short, - correlationId: Int, + override val correlationId: Int, clientId: String, ackTimeoutMs: Int, deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int) - extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) { + extends RequestOrResponse(Some(RequestKeys.StopReplicaKey), correlationId) { def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int, correlationId: Int) = { this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala index fa66b99..c82eadd 100644 --- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala +++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala @@ -42,9 +42,10 @@ object StopReplicaResponse { } -case class StopReplicaResponse(val correlationId: Int, +case class StopReplicaResponse(override val correlationId: Int, val responseMap: Map[(String, Int), Short], - val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{ + val errorCode: Short = ErrorMapping.NoError) + extends RequestOrResponse(correlationId = correlationId) { def sizeInBytes(): Int ={ var size = 4 /* correlation id */ + diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 88007b1..7477cfd 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -47,10 +47,10 @@ object TopicMetadataRequest extends Logging { } case class TopicMetadataRequest(val versionId: Short, - val correlationId: Int, + override val correlationId: Int, val clientId: String, val topics: Seq[String]) - extends RequestOrResponse(Some(RequestKeys.MetadataKey)){ + extends RequestOrResponse(Some(RequestKeys.MetadataKey), correlationId){ def this(topics: Seq[String], correlationId: Int) = this(TopicMetadataRequest.CurrentVersion, correlationId, TopicMetadataRequest.DefaultClientId, topics) diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala index af76776..290f263 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala @@ -34,7 +34,8 @@ object TopicMetadataResponse { } case class TopicMetadataResponse(topicsMetadata: Seq[TopicMetadata], - correlationId: Int) extends RequestOrResponse { + override val correlationId: Int) + extends RequestOrResponse(correlationId = correlationId) { val sizeInBytes: Int = { val brokers = extractBrokers(topicsMetadata).values 4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 469ac79..07f326f 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -25,6 +25,7 @@ import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup import kafka.common.ErrorMapping import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} +import org.apache.log4j.Logger /** @@ -51,7 +52,8 @@ class Partition(val topic: String, * In addition to the leader, the controller can also send the epoch of the controller that elected the leader for * each partition. */ private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 - this.logIdent = "Partition [%s, %d] on broker %d: ".format(topic, partitionId, localBrokerId) + this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId) + private val stateChangeLogger = Logger.getLogger("state.change.logger") private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) @@ -124,15 +126,17 @@ class Partition(val topic: String, * 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) * 4. set the new leader and ISR */ - def makeLeader(topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Boolean = { + def makeLeader(controllerId: Int, topic: String, partitionId: Int, + leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr if (leaderEpoch >= leaderAndIsr.leaderEpoch){ - info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become leader request" - .format(leaderEpoch, leaderAndIsr.leaderEpoch)) + stateChangeLogger.trace(("Broker %d discarded the become-leader request with correlation id %d from " + + "controller %d epoch %d for partition [%s,%d] since current leader epoch %d is >= the request's leader epoch %d") + .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, topic, + partitionId, leaderEpoch, leaderAndIsr.leaderEpoch)) return false } - trace("Started to become leader at the request %s".format(leaderAndIsr.toString())) // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch @@ -159,22 +163,21 @@ class Partition(val topic: String, * 3. set the leader and set ISR to empty * 4. start a fetcher to the new leader */ - def makeFollower(topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, - liveBrokers: Set[Broker]): Boolean = { + def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, + liveBrokers: Set[Broker], correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr - if (leaderEpoch >= leaderAndIsr.leaderEpoch){ - info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follower request" - .format(leaderEpoch, leaderAndIsr.leaderEpoch)) + if (leaderEpoch >= leaderAndIsr.leaderEpoch) { + stateChangeLogger.trace(("Broker %d discarded the become-follower request with correlation id %d from " + + "controller %d epoch %d for partition [%s,%d] since current leader epoch %d is >= the request's leader epoch %d") + .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, topic, + partitionId, leaderEpoch, leaderAndIsr.leaderEpoch)) return false } - trace("Started to become follower at the request %s".format(leaderAndIsr.toString())) // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch val newLeaderBrokerId: Int = leaderAndIsr.leader - info("Starting the follower state transition to follow leader %d for topic %s partition %d" - .format(newLeaderBrokerId, topic, partitionId)) liveBrokers.find(_.id == newLeaderBrokerId) match { case Some(leaderBroker) => // stop fetcher thread to previous leader @@ -189,8 +192,9 @@ class Partition(val topic: String, // start fetcher thread to current leader replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) case None => // leader went down - warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId, newLeaderBrokerId) + - " topic %s partition %d became unavailble during the state change operation".format(topic, partitionId)) + stateChangeLogger.trace(("Broker %d aborted the become-follower state change since leader %d for partition [%s,%d] " + + "became unavailble during the state change operation") + .format(localBrokerId, newLeaderBrokerId, topic, partitionId)) } true } @@ -198,7 +202,7 @@ class Partition(val topic: String, def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) { leaderIsrUpdateLock synchronized { - debug("Recording follower %d position %d for topic %s partition %d.".format(replicaId, offset, topic, partitionId)) + debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId)) val replica = getOrCreateReplica(replicaId) replica.logEndOffset = offset diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala index 5bd8f6b..c1b9f65 100644 --- a/core/src/main/scala/kafka/common/Topic.scala +++ b/core/src/main/scala/kafka/common/Topic.scala @@ -20,7 +20,7 @@ package kafka.common import util.matching.Regex object Topic { - private val legalChars = "[a-zA-Z0-9\\._\\-]" + val legalChars = "[a-zA-Z0-9\\._\\-]" private val maxNameLength = 255 private val rgx = new Regex(legalChars + "+") diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index e2ca1d6..2e50b8d 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -24,15 +24,18 @@ import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue} import kafka.server.KafkaConfig import collection.mutable import kafka.api._ +import org.apache.log4j.Logger class ControllerChannelManager private (config: KafkaConfig) extends Logging { + private var controllerContext: ControllerContext = null private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] private val brokerLock = new Object - this.logIdent = "[Channel manager on controller " + config.brokerId + "], " + this.logIdent = "[Channel manager on controller " + config.brokerId + "]: " - def this(allBrokers: Set[Broker], config : KafkaConfig) { + def this(controllerContext: ControllerContext, config : KafkaConfig) { this(config) - allBrokers.foreach(addNewBroker(_)) + this.controllerContext = controllerContext + controllerContext.liveBrokers.foreach(addNewBroker(_)) } def startup() = { @@ -82,7 +85,7 @@ class ControllerChannelManager private (config: KafkaConfig) extends Logging { BlockingChannel.UseDefaultBufferSize, config.controllerSocketTimeoutMs) channel.connect() - val requestThread = new RequestSendThread(config.brokerId, broker.id, messageQueue, channel) + val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker.id, messageQueue, channel) requestThread.setDaemon(false) brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread)) } @@ -105,11 +108,13 @@ class ControllerChannelManager private (config: KafkaConfig) extends Logging { } class RequestSendThread(val controllerId: Int, + val controllerContext: ControllerContext, val toBrokerId: Int, val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], val channel: BlockingChannel) extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) { private val lock = new Object() + private val stateChangeLogger = Logger.getLogger("state.change.logger") override def doWork(): Unit = { val queueItem = queue.take() @@ -129,7 +134,8 @@ class RequestSendThread(val controllerId: Int, case RequestKeys.StopReplicaKey => response = StopReplicaResponse.readFrom(receive.buffer) } - trace("Controller %d request to broker %d got a response %s".format(controllerId, toBrokerId, response)) + stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %d" + .format(controllerId, controllerContext.epoch, response.correlationId, toBrokerId)) if(callback != null){ callback(response) @@ -143,11 +149,12 @@ class RequestSendThread(val controllerId: Int, } } -class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit) +class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit, controllerId: Int) extends Logging { val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]] val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]] val stopAndDeleteReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]] + private val stateChangeLogger = Logger.getLogger("state.change.logger") def newBatch() { // raise error if the previous batch is not empty @@ -162,10 +169,8 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, replicationFactor: Int) { brokerIds.foreach { brokerId => - leaderAndIsrRequestMap.getOrElseUpdate(brokerId, - new mutable.HashMap[(String, Int), PartitionStateInfo]) - leaderAndIsrRequestMap(brokerId).put((topic, partition), - PartitionStateInfo(leaderIsrAndControllerEpoch, replicationFactor)) + leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo]) + leaderAndIsrRequestMap(brokerId).put((topic, partition), PartitionStateInfo(leaderIsrAndControllerEpoch, replicationFactor)) } } @@ -190,8 +195,13 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques val partitionStateInfos = m._2.toMap val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet val leaders = liveBrokers.filter(b => leaderIds.contains(b.id)) - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerEpoch, correlationId) - debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest)) + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId) + for (p <- partitionStateInfos) { + val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" + stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request with correlationId %d to broker %d " + + "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, correlationId, broker, + p._1._1, p._1._2)) + } sendRequest(broker, leaderAndIsrRequest, null) } leaderAndIsrRequestMap.clear() diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 48eae7e..e18ab07 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -89,7 +89,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) - private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest) + private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest, config.brokerId) registerControllerChangedListener() newGauge( @@ -491,7 +491,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } private def startChannelManager() { - controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext.liveBrokers, config) + controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config) controllerContext.controllerChannelManager.startup() } diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 4078604..cd51acc 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -24,6 +24,7 @@ import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOff import kafka.utils.{Logging, ZkUtils} import org.I0Itec.zkclient.IZkChildListener import org.I0Itec.zkclient.exception.ZkNodeExistsException +import org.apache.log4j.Logger /** * This class represents the state machine for partitions. It defines the states that a partition can be in, and @@ -38,13 +39,15 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException * moves to the OfflinePartition state. Valid previous states are NewPartition/OnlinePartition */ class PartitionStateMachine(controller: KafkaController) extends Logging { - this.logIdent = "[Partition state machine on Controller " + controller.config.brokerId + "]: " private val controllerContext = controller.controllerContext + private val controllerId = controller.config.brokerId private val zkClient = controllerContext.zkClient var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty - val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest) + val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId) val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext) private val isShuttingDown = new AtomicBoolean(false) + this.logIdent = "[Partition state machine on Controller " + controllerId + "]: " + private val stateChangeLogger = Logger.getLogger("state.change.logger") /** * Invoked on successful controller election. First registers a topic change listener since that triggers all @@ -126,12 +129,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { targetState match { case NewPartition => // pre: partition did not exist before this - // post: partition has been assigned replicas assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition) assignReplicasToPartitions(topic, partition) partitionState.put(topicAndPartition, NewPartition) - info("Partition %s state changed from NotExists to New with assigned replicas ".format(topicAndPartition) + - "%s".format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(","))) + val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",") + stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from NotExists to New with assigned replicas %s" + .format(controllerId, controller.epoch, topicAndPartition, assignedReplicas)) + // post: partition has been assigned replicas case OnlinePartition => assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition) partitionState(topicAndPartition) match { @@ -144,27 +148,31 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { electLeaderForPartition(topic, partition, leaderSelector) case _ => // should never come here since illegal previous states are checked above } - info("Partition %s state changed from %s to OnlinePartition with leader %d".format(topicAndPartition, - partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader)) partitionState.put(topicAndPartition, OnlinePartition) + val leader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader + stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to OnlinePartition with leader %d" + .format(controllerId, controller.epoch, topicAndPartition, partitionState(topicAndPartition), leader)) // post: partition has a leader case OfflinePartition => - // pre: partition should be in Online state + // pre: partition should be in New or Online state assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition) // should be called when the leader for a partition is no longer alive - info("Partition %s state changed from Online to Offline".format(topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from Online to Offline" + .format(controllerId, controller.epoch, topicAndPartition)) partitionState.put(topicAndPartition, OfflinePartition) // post: partition has no alive leader case NonExistentPartition => - // pre: partition could be in either of the above states + // pre: partition should be in Offline state assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition) - info("Partition %s state changed from Offline to NotExists".format(topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from Offline to NotExists" + .format(controllerId, controller.epoch, topicAndPartition)) partitionState.put(topicAndPartition, NonExistentPartition) // post: partition state is deleted from all brokers and zookeeper } } catch { - case t: Throwable => error("State change for partition %s ".format(topicAndPartition) + - "from %s to %s failed".format(currState, targetState), t) + case t: Throwable => + stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed" + .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t) } } @@ -225,9 +233,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { liveAssignedReplicas.size match { case 0 => ControllerStats.offlinePartitionRate.mark() - throw new StateChangeFailedException(("During state change of partition %s from NEW to ONLINE, assigned replicas are " + - "[%s], live brokers are [%s]. No assigned replica is alive").format(topicAndPartition, - replicaAssignment.mkString(","), controllerContext.liveBrokerIds)) + val failMsg = ("During state change of partition %s from NEW to ONLINE, assigned replicas are [%s], " + + "live brokers are [%s]. No assigned replica is alive.") + .format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds) + stateChangeLogger.trace("Controller %d epoch %d:".format(controllerId, controller.epoch) + failMsg) + throw new StateChangeFailedException(failMsg) case _ => debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas)) // make the first replica in the list of assigned replicas, the leader @@ -251,9 +261,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition).get ControllerStats.offlinePartitionRate.mark() - throw new StateChangeFailedException("Error while changing partition %s's state from New to Online" - .format(topicAndPartition) + " since Leader and isr path already exists with value " + - "%s and controller epoch %d".format(leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch)) + val failMsg = ("Error while changing partition %s's state from New to Online since LeaderAndIsr path already " + + "exists with value %s and controller epoch %d") + .format(topicAndPartition, leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch) + stateChangeLogger.trace("Controller %d epoch %d:".format(controllerId, controller.epoch) + failMsg) + throw new StateChangeFailedException(failMsg) } } } @@ -268,7 +280,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) { val topicAndPartition = TopicAndPartition(topic, partition) // handle leader election for the partitions whose leader is no longer alive - info("Electing leader for partition %s".format(topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d started leader election for partition %s" + .format(controllerId, controller.epoch, topicAndPartition)) try { var zookeeperPathUpdateSucceeded: Boolean = false var newLeaderAndIsr: LeaderAndIsr = null @@ -277,10 +290,14 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition) val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch - if(controllerEpoch > controller.epoch) - throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" + - "means the current controller with epoch %d went through a soft failure and another ".format(controller.epoch) + - "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch)) + if (controllerEpoch > controller.epoch) { + val failMsg = ("Controller %d epoch %d aborted leader election for partition [%s,%d] since the LeaderAndIsr path was " + + "already written by another controller. This probably means that the current controller %d went through " + + "a soft failure and another controller was elected with epoch %d.") + .format(controllerId, controller.epoch, topic, partition, controllerId, controllerEpoch) + stateChangeLogger.trace("Controller %d epoch %d:".format(controllerId, controller.epoch) + failMsg) + throw new StateChangeFailedException(failMsg) + } // elect new leader or throw exception val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr) val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, @@ -294,7 +311,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch) // update the leader cache controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch) - info("Elected leader %d for Offline partition %s".format(newLeaderAndIsr.leader, topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s" + .format(controllerId, controller.epoch, newLeaderAndIsr.leader, topicAndPartition)) // store new leader and isr info in cache brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size) @@ -302,8 +320,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas %s for partition %s are dead." .format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(","), topicAndPartition) + " Marking this partition offline", poe) - case sce => throw new StateChangeFailedException(("Error while electing leader for partition " + - " %s due to: %s.").format(topicAndPartition, sce.getMessage), sce) + case sce => + val failMsg = "Error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage) + stateChangeLogger.trace("Controller %d epoch %d:".format(controllerId, controller.epoch) + failMsg) + throw new StateChangeFailedException(failMsg, sce) } debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2)))) } @@ -321,8 +341,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match { case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch case None => - throw new StateChangeFailedException("Leader and ISR information doesn't exist for partition " + - "%s in %s state".format(topicAndPartition, partitionState(topicAndPartition))) + val failMsg = "LeaderAndIsr information doesn't exist for partition %s in %s state" + .format(topicAndPartition, partitionState(topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d:".format(controllerId, controller.epoch) + failMsg) + throw new StateChangeFailedException(failMsg) } } @@ -362,7 +384,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } class PartitionChangeListener(topic: String) extends IZkChildListener with Logging { - this.logIdent = "[Controller " + controller.config.brokerId + "], " + this.logIdent = "[Controller " + controller.config.brokerId + "]: " @throws(classOf[Exception]) def handleChildChange(parentPath : String, children : java.util.List[String]) { diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 20d9c4f..43d60cf 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.common.{TopicAndPartition, StateChangeFailedException} import kafka.utils.{ZkUtils, Logging} import org.I0Itec.zkclient.IZkChildListener +import org.apache.log4j.Logger /** * This class represents the state machine for replicas. It defines the states that a replica can be in, and @@ -37,12 +38,14 @@ import org.I0Itec.zkclient.IZkChildListener * 4. NonExistentReplica: If a replica is deleted, it is moved to this state. Valid previous state is OfflineReplica */ class ReplicaStateMachine(controller: KafkaController) extends Logging { - this.logIdent = "[Replica state machine on Controller " + controller.config.brokerId + "]: " private val controllerContext = controller.controllerContext + private val controllerId = controller.config.brokerId private val zkClient = controllerContext.zkClient var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty - val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest) + val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controller.config.brokerId) private val isShuttingDown = new AtomicBoolean(false) + this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: " + private val stateChangeLogger = Logger.getLogger("state.change.logger") /** * Invoked on successful controller election. First registers a broker change listener since that triggers all @@ -117,17 +120,18 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case None => // new leader request will be sent to this replica when one gets elected } replicaState.put((topic, partition, replicaId), NewReplica) - info("Replica %d for partition %s state changed to NewReplica".format(replicaId, topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NewReplica" + .format(controllerId, controller.epoch, replicaId, topicAndPartition)) case NonExistentReplica => assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState) // send stop replica command brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true) // remove this replica from the assigned replicas list for its partition val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) - controllerContext.partitionReplicaAssignment.put(topicAndPartition, - currentAssignedReplicas.filterNot(_ == replicaId)) - info("Replica %d for partition %s state changed to NonExistentReplica".format(replicaId, topicAndPartition)) + controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId)) replicaState.remove((topic, partition, replicaId)) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NonExistentReplica" + .format(controllerId, controller.epoch, replicaId, topicAndPartition)) case OnlineReplica => assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState) replicaState((topic, partition, replicaId)) match { @@ -135,7 +139,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { // add this replica to the assigned replicas list for its partition val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId) - info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica" + .format(controllerId, controller.epoch, replicaId, topicAndPartition)) case _ => // check if the leader for this partition is alive or even exists controllerContext.allLeaders.get(topicAndPartition) match { @@ -146,7 +151,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { topic, partition, leaderIsrAndControllerEpoch, replicaAssignment.size) replicaState.put((topic, partition, replicaId), OnlineReplica) - info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica" + .format(controllerId, controller.epoch, replicaId, topicAndPartition)) case false => // ignore partitions whose leader is not alive } case None => // ignore partitions who don't have a leader yet @@ -167,8 +173,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment.size) replicaState.put((topic, partition, replicaId), OfflineReplica) - info("Replica %d for partition %s state changed to OfflineReplica".format(replicaId, topicAndPartition)) - info("Removed offline replica %d from ISR for partition %s".format(replicaId, topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica" + .format(controllerId, controller.epoch, replicaId, topicAndPartition)) false case None => true @@ -184,15 +190,16 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { } } catch { - case t: Throwable => error("Error while changing state of replica %d for partition ".format(replicaId) + - "[%s, %d] to %s".format(topic, partition, targetState), t) + case t: Throwable => + stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] to %s failed" + .format(controllerId, controller.epoch, replicaId, topic, partition, targetState), t) } } private def assertValidPreviousStates(topic: String, partition: Int, replicaId: Int, fromStates: Seq[ReplicaState], targetState: ReplicaState) { assert(fromStates.contains(replicaState((topic, partition, replicaId))), - "Replica %s for partition [%s, %d] should be in the %s states before moving to %s state" + "Replica %s for partition [%s,%d] should be in the %s states before moving to %s state" .format(replicaId, topic, partition, fromStates.mkString(","), targetState) + ". Instead it is in %s state".format(replicaState((topic, partition, replicaId)))) } diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index 3d92569..5f80df7 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -21,9 +21,11 @@ import java.nio.ByteBuffer import scala.collection.JavaConversions class TopicMetadataRequest(val versionId: Short, - val correlationId: Int, + override val correlationId: Int, val clientId: String, - val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) { + val topics: java.util.List[String]) + extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) { + val underlying: kafka.api.TopicMetadataRequest = new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, JavaConversions.asBuffer(topics)) @@ -36,4 +38,5 @@ class TopicMetadataRequest(val versionId: Short, def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer) def sizeInBytes: Int = underlying.sizeInBytes() + } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index f7fe0de..2dd390e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit import kafka.common._ import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} import kafka.controller.KafkaController +import org.apache.log4j.Logger object ReplicaManager { @@ -42,14 +43,16 @@ class ReplicaManager(val config: KafkaConfig, val logManager: LogManager) extends Logging with KafkaMetricsGroup { /* epoch of the controller that last changed the leader */ @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 + private val localBrokerId = config.brokerId private val allPartitions = new Pool[(String, Int), Partition] private var leaderPartitions = new mutable.HashSet[Partition]() private val leaderPartitionsLock = new Object val replicaFetcherManager = new ReplicaFetcherManager(config, this) - this.logIdent = "Replica Manager on Broker " + config.brokerId + ": " private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap private var hwThreadInitialized = false + this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " + private val stateChangeLogger = Logger.getLogger("state.change.logger") newGauge( "LeaderCount", @@ -102,7 +105,7 @@ class ReplicaManager(val config: KafkaConfig, } def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = { - trace("Handling stop replica for partition [%s, %d]".format(topic, partitionId)) + stateChangeLogger.trace("Broker %d handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId)) val errorCode = ErrorMapping.NoError getReplica(topic, partitionId) match { case Some(replica) => @@ -114,10 +117,10 @@ class ReplicaManager(val config: KafkaConfig, leaderPartitions -= replica.partition } allPartitions.remove((topic, partitionId)) - info("After removing partition (%s, %d), the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions)) + info("After removing partition [%s,%d], the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions)) case None => //do nothing if replica no longer exists } - trace("Finish handling stop replica [%s, %d]".format(topic, partitionId)) + stateChangeLogger.trace("Broker %d finished handling stop replica [%s,%d]".format(localBrokerId, topic, partitionId)) errorCode } @@ -125,7 +128,7 @@ class ReplicaManager(val config: KafkaConfig, val responseMap = new collection.mutable.HashMap[(String, Int), Short] if(stopReplicaRequest.controllerEpoch < controllerEpoch) { error("Received stop replica request from an old controller epoch %d.".format(stopReplicaRequest.controllerEpoch) + - " Latest known controller epoch is %d " + controllerEpoch) + " Latest known controller epoch is %d " + controllerEpoch) (responseMap, ErrorMapping.StaleControllerEpochCode) } else { controllerEpoch = stopReplicaRequest.controllerEpoch @@ -160,7 +163,7 @@ class ReplicaManager(val config: KafkaConfig, if(replicaOpt.isDefined) return replicaOpt.get else - throw new ReplicaNotAvailableException("Replica %d is not available for partiton [%s, %d] yet".format(config.brokerId, topic, partition)) + throw new ReplicaNotAvailableException("Replica %d is not available for partiton [%s,%d] yet".format(config.brokerId, topic, partition)) } def getLeaderReplicaIfLocal(topic: String, partitionId: Int): Replica = { @@ -187,13 +190,19 @@ class ReplicaManager(val config: KafkaConfig, } def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = { - info("Handling leader and isr request %s".format(leaderAndISRRequest)) + leaderAndISRRequest.partitionStateInfos.foreach(p => + stateChangeLogger.trace("Broker %d handling LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]" + .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, + leaderAndISRRequest.controllerEpoch, p._1._1, p._1._2))) + info("Handling LeaderAndIsr request %s".format(leaderAndISRRequest)) + val responseMap = new collection.mutable.HashMap[(String, Int), Short] if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { - error("Received leader and isr request from an old controller epoch %d.".format(leaderAndISRRequest.controllerEpoch) + - " Latest known controller epoch is %d " + controllerEpoch) + stateChangeLogger.error("Broker %d received LeaderAndIsr request correlationId %d with an old controllerEpoch %d, latest known controllerEpoch is %d" + .format(localBrokerId, leaderAndISRRequest.controllerEpoch, leaderAndISRRequest.correlationId, controllerEpoch)) (responseMap, ErrorMapping.StaleControllerEpochCode) }else { + val controllerId = leaderAndISRRequest.controllerId controllerEpoch = leaderAndISRRequest.controllerEpoch for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos) { var errorCode = ErrorMapping.NoError @@ -203,17 +212,25 @@ class ReplicaManager(val config: KafkaConfig, val requestedLeaderId = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader try { if(requestedLeaderId == config.brokerId) - makeLeader(topic, partitionId, partitionStateInfo) + makeLeader(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.correlationId) else - makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders) + makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders, + leaderAndISRRequest.correlationId) } catch { case e => - error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e) + val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + + "epoch %d for partition %s").format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, + leaderAndISRRequest.controllerEpoch, topicAndPartition) + stateChangeLogger.error(errorMsg, e) errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) } responseMap.put(topicAndPartition, errorCode) + leaderAndISRRequest.partitionStateInfos.foreach(p => + stateChangeLogger.trace("Broker %d handled LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]" + .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, + leaderAndISRRequest.controllerEpoch, p._1._1, p._1._2))) } - info("Completed leader and isr request %s".format(leaderAndISRRequest)) + info("Handled leader and isr request %s".format(leaderAndISRRequest)) // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions // have been completely populated before starting the checkpointing there by avoiding weird race conditions if (!hwThreadInitialized) { @@ -225,33 +242,38 @@ class ReplicaManager(val config: KafkaConfig, } } - private def makeLeader(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) = { + private def makeLeader(controllerId: Int, epoch:Int, topic: String, partitionId: Int, + partitionStateInfo: PartitionStateInfo, correlationId: Int) = { val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch - info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId)) + stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "starting the become-leader transition for partition [%s,%d]") + .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeLeader(topic, partitionId, leaderIsrAndControllerEpoch)) { + if (partition.makeLeader(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, correlationId)) { // also add this partition to the list of partitions for which the leader is the current broker leaderPartitionsLock synchronized { leaderPartitions += partition } } - info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId)) + stateChangeLogger.trace("Broker %d completed become-leader transition for partition [%s,%d]".format(localBrokerId, topic, partitionId)) } - private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo, - liveBrokers: Set[Broker]) { + private def makeFollower(controllerId: Int, epoch: Int, topic: String, partitionId: Int, + partitionStateInfo: PartitionStateInfo, liveBrokers: Set[Broker], correlationId: Int) { val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderBrokerId: Int = leaderIsrAndControllerEpoch.leaderAndIsr.leader - info("Starting the follower state transition to follow leader %d for topic %s partition %d" - .format(leaderBrokerId, topic, partitionId)) + stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "starting the become-follower transition for partition [%s,%d]") + .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeFollower(topic, partitionId, leaderIsrAndControllerEpoch, liveBrokers)) { + if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, liveBrokers, correlationId)) { // remove this replica's partition from the ISR expiration queue leaderPartitionsLock synchronized { leaderPartitions -= partition } } + stateChangeLogger.trace("Broker %d completed the become-follower transition for partition [%s,%d]".format(localBrokerId, topic, partitionId)) } private def maybeShrinkIsr(): Unit = { @@ -266,7 +288,7 @@ class ReplicaManager(val config: KafkaConfig, if(partitionOpt.isDefined) { partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset) } else { - warn("While recording the follower position, the partition [%s, %d] hasn't been created, skip updating leader HW".format(topic, partitionId)) + warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId)) } } diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala new file mode 100644 index 0000000..97970fb --- /dev/null +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import joptsimple._ +import scala.util.matching.Regex +import collection.mutable +import java.util.Date +import java.text.SimpleDateFormat +import kafka.utils.Logging +import kafka.common.Topic +import java.io.{BufferedOutputStream, OutputStream} + +/** + * A utility that merges the state change logs (possibly obtained from different brokers and over multiple days). + * + * This utility expects at least one of the following two arguments - + * 1. A list of state change log files + * 2. A regex to specify state change log file names. + * + * This utility optionally also accepts the following arguments - + * 1. The topic whose state change logs should be merged + * 2. A list of partitions whose state change logs should be merged (can be specified only when the topic argument + * is explicitly specified) + * 3. Start time from when the logs should be merged + * 4. End time until when the logs should be merged + */ + +object StateChangeLogMerger extends Logging { + + val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS" + val topicPartitionRegex = new Regex("\\[(" + Topic.legalChars + "+),( )*([0-9]+)\\]") + val dateRegex = new Regex("[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}") + val dateFormat = new SimpleDateFormat(dateFormatString) + var files: List[String] = List() + var topic: String = null + var partitions: List[Int] = List() + var startDate: Date = null + var endDate: Date = null + + def main(args: Array[String]) { + + // Parse input arguments. + val parser = new OptionParser + val filesOpt = parser.accepts("logs", "Comma separated list of state change logs or a regex for the log file names") + .withRequiredArg + .describedAs("file1,file2,...") + .ofType(classOf[String]) + val regexOpt = parser.accepts("logs-regex", "Regex to match the state change log files to be merged") + .withRequiredArg + .describedAs("for example: /tmp/state-change.log*") + .ofType(classOf[String]) + val topicOpt = parser.accepts("topic", "The topic whose state change logs should be merged") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids whose state change logs should be merged") + .withRequiredArg + .describedAs("0,1,2,...") + .ofType(classOf[String]) + val startTimeOpt = parser.accepts("start-time", "The earliest timestamp of state change log entries to be merged") + .withRequiredArg + .describedAs("start timestamp in the format " + dateFormat) + .ofType(classOf[String]) + .defaultsTo("0000-00-00 00:00:00,000") + val endTimeOpt = parser.accepts("end-time", "The latest timestamp of state change log entries to be merged") + .withRequiredArg + .describedAs("end timestamp in the format " + dateFormat) + .ofType(classOf[String]) + .defaultsTo("9999-12-31 23:59:59,999") + + + val options = parser.parse(args : _*) + if ((!options.has(filesOpt) && !options.has(regexOpt)) || (options.has(filesOpt) && options.has(regexOpt))) { + System.err.println("Provide arguments to exactly one of the two options \"" + filesOpt + "\" or \"" + regexOpt + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + if (options.has(partitionsOpt) && !options.has(topicOpt)) { + System.err.println("The option \"" + topicOpt + "\" needs to be provided an argument when specifying partition ids") + parser.printHelpOn(System.err) + System.exit(1) + } + + // Populate data structures. + if (options.has(filesOpt)) { + files :::= options.valueOf(filesOpt).split(",").toList + } else if (options.has(regexOpt)) { + val regex = options.valueOf(regexOpt) + val fileNameIndex = regex.lastIndexOf('/') + 1 + val dirName = if (fileNameIndex == 0) "." else regex.substring(0, fileNameIndex - 1) + val fileNameRegex = new Regex(regex.substring(fileNameIndex)) + files :::= new java.io.File(dirName).listFiles.filter(f => fileNameRegex.findFirstIn(f.getName) != None).map(dirName + "/" + _.getName).toList + } + if (options.has(topicOpt)) { + topic = options.valueOf(topicOpt) + } + if (options.has(partitionsOpt)) { + partitions = options.valueOf(partitionsOpt).split(",").toList.map(_.toInt) + } + startDate = dateFormat.parse(options.valueOf(startTimeOpt).replace('\"', ' ').trim) + endDate = dateFormat.parse(options.valueOf(endTimeOpt).replace('\"', ' ').trim) + + /** + * n-way merge from m input files: + * 1. Read a line that matches the specified topic/partitions and date range from every input file in a priority queue. + * 2. Take the line from the file with the earliest date and add it to a buffered output stream. + * 3. Add another line from the file selected in step 2 in the priority queue. + * 4. Flush the output buffer at the end. (The buffer will also be automatically flushed every K bytes.) + */ + val pqueue = new mutable.PriorityQueue[LineIterator]()(dateBasedOrdering) + val output: OutputStream = new BufferedOutputStream(System.out, 1024*1024) + val lineIterators = files.map(io.Source.fromFile(_).getLines) + var lines: List[LineIterator] = List() + + for (itr <- lineIterators) { + val lineItr = getNextLine(itr) + if (!lineItr.isEmpty) + lines ::= lineItr + } + if (!lines.isEmpty) pqueue.enqueue(lines:_*) + + while (!pqueue.isEmpty) { + val lineItr = pqueue.dequeue() + output.write((lineItr.line + "\n").getBytes) + val nextLineItr = getNextLine(lineItr.itr) + if (!nextLineItr.isEmpty) + pqueue.enqueue(nextLineItr) + } + + output.flush() + } + + /** + * Returns the next line that matches the specified topic/partitions from the file that has the earliest date + * from the specified date range. + * @param itr Line iterator of a file + * @return (line from a file, line iterator for the same file) + */ + def getNextLine(itr: Iterator[String]): LineIterator = { + while (itr != null && itr.hasNext) { + val nextLine = itr.next + dateRegex.findFirstIn(nextLine) match { + case Some(d) => + val date = dateFormat.parse(d) + if ((date.equals(startDate) || date.after(startDate)) && (date.equals(endDate) || date.before(endDate))) { + topicPartitionRegex.findFirstMatchIn(nextLine) match { + case Some(matcher) => + if ((topic == null || topic == matcher.group(1)) && (partitions.isEmpty || partitions.contains(matcher.group(3).toInt))) + return new LineIterator(nextLine, itr) + case None => + } + } + case None => + } + } + new LineIterator() + } + + class LineIterator(val line: String, val itr: Iterator[String]) { + def this() = this("", null) + def isEmpty = (line == "" && itr == null) + } + + implicit object dateBasedOrdering extends Ordering[LineIterator] { + def compare(first: LineIterator, second: LineIterator) = { + val firstDate = dateRegex.findFirstIn(first.line).get + val secondDate = dateRegex.findFirstIn(second.line).get + secondDate.compareTo(firstDate) + } + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/Topic.scala b/core/src/main/scala/kafka/utils/Topic.scala deleted file mode 100644 index e69de29..0000000 diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index d0c7b90..4c209f1 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -89,7 +89,7 @@ object SerializationTestUtils{ val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1) val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)), ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3))) - new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 1, 0) + new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1, 0) } def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = { diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 129bc56..ec1db2d 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -23,7 +23,7 @@ import kafka.admin.CreateTopicCommand import kafka.utils.TestUtils._ import junit.framework.Assert._ import kafka.utils.{ZkUtils, Utils, TestUtils} -import kafka.controller.{LeaderIsrAndControllerEpoch, ControllerChannelManager} +import kafka.controller.{ControllerContext, LeaderIsrAndControllerEpoch, ControllerChannelManager} import kafka.cluster.Broker import kafka.common.ErrorMapping import kafka.api._ @@ -120,18 +120,20 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1)) assertEquals("First epoch value should be 0", 0, leaderEpoch1) - // start another controller - val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort())) + val controllerId = 2 + val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort())) val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port)) - val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig) + val controllerContext = new ControllerContext(zkClient) + controllerContext.liveBrokers = brokers.toSet + val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig) controllerChannelManager.startup() val staleControllerEpoch = 0 val leaderAndIsr = new collection.mutable.HashMap[(String, Int), LeaderIsrAndControllerEpoch] leaderAndIsr.put((topic, partitionId), new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2)) val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1)).toMap - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, staleControllerEpoch, 0) + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId, staleControllerEpoch, 0) controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback) TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)