From 3b0ad4f2d810f8d423de4b805d2bc1d844d79126 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Mon, 6 Apr 2015 17:34:49 -0700 Subject: [PATCH] rename BrokerEndpoint to BrokerEndPoint --- core/src/main/scala/kafka/admin/AdminUtils.scala | 8 ++++---- core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala | 8 ++++---- core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala | 10 +++++----- core/src/main/scala/kafka/api/TopicMetadata.scala | 12 ++++++------ core/src/main/scala/kafka/api/TopicMetadataResponse.scala | 6 +++--- core/src/main/scala/kafka/api/UpdateMetadataRequest.scala | 4 ++-- core/src/main/scala/kafka/client/ClientUtils.scala | 10 +++++----- core/src/main/scala/kafka/cluster/Broker.scala | 6 +++--- core/src/main/scala/kafka/cluster/BrokerEndPoint.scala | 12 ++++++------ .../main/scala/kafka/consumer/ConsumerFetcherManager.scala | 6 +++--- .../main/scala/kafka/consumer/ConsumerFetcherThread.scala | 4 ++-- .../main/scala/kafka/javaapi/ConsumerMetadataResponse.scala | 4 ++-- core/src/main/scala/kafka/javaapi/TopicMetadata.scala | 8 ++++---- core/src/main/scala/kafka/producer/ProducerPool.scala | 6 +++--- .../src/main/scala/kafka/server/AbstractFetcherManager.scala | 8 ++++---- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 4 ++-- core/src/main/scala/kafka/server/MetadataCache.scala | 8 ++++---- core/src/main/scala/kafka/server/ReplicaFetcherManager.scala | 4 ++-- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | 4 ++-- core/src/main/scala/kafka/server/ReplicaManager.scala | 4 ++-- .../src/main/scala/kafka/tools/ReplicaVerificationTool.scala | 6 +++--- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala | 6 +++--- core/src/main/scala/kafka/utils/ZkUtils.scala | 2 +- .../unit/kafka/api/RequestResponseSerializationTest.scala | 4 ++-- .../test/scala/unit/kafka/cluster/BrokerEndPointTest.scala | 4 ++-- .../scala/unit/kafka/integration/TopicMetadataTest.scala | 4 ++-- .../test/scala/unit/kafka/producer/AsyncProducerTest.scala | 8 ++++---- 27 files changed, 85 insertions(+), 85 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 0d3332e..eee80f9 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -18,7 +18,7 @@ package kafka.admin import kafka.common._ -import kafka.cluster.{BrokerEndpoint, Broker} +import kafka.cluster.{BrokerEndPoint, Broker} import kafka.log.LogConfig import kafka.utils._ @@ -356,9 +356,9 @@ object AdminUtils extends Logging { val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader) - var leaderInfo: Option[BrokerEndpoint] = None - var replicaInfo: Seq[BrokerEndpoint] = Nil - var isrInfo: Seq[BrokerEndpoint] = Nil + var leaderInfo: Option[BrokerEndPoint] = None + var replicaInfo: Seq[BrokerEndPoint] = Nil + var isrInfo: Seq[BrokerEndPoint] = Nil try { leaderInfo = leader match { case Some(l) => diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala index d2a3d43..ea1c0d0 100644 --- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala @@ -18,18 +18,18 @@ package kafka.api import java.nio.ByteBuffer -import kafka.cluster.BrokerEndpoint +import kafka.cluster.BrokerEndPoint import kafka.common.ErrorMapping object ConsumerMetadataResponse { val CurrentVersion = 0 - private val NoBrokerEndpointOpt = Some(BrokerEndpoint(id = -1, host = "", port = -1)) + private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1)) def readFrom(buffer: ByteBuffer) = { val correlationId = buffer.getInt val errorCode = buffer.getShort - val broker = BrokerEndpoint.readFrom(buffer) + val broker = BrokerEndPoint.readFrom(buffer) val coordinatorOpt = if (errorCode == ErrorMapping.NoError) Some(broker) else @@ -40,7 +40,7 @@ object ConsumerMetadataResponse { } -case class ConsumerMetadataResponse (coordinatorOpt: Option[BrokerEndpoint], errorCode: Short, correlationId: Int) +case class ConsumerMetadataResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int) extends RequestOrResponse() { def sizeInBytes = diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index bf93632..2fad585 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -21,7 +21,7 @@ package kafka.api import java.nio._ import kafka.utils._ import kafka.api.ApiUtils._ -import kafka.cluster.BrokerEndpoint +import kafka.cluster.BrokerEndPoint import kafka.controller.LeaderIsrAndControllerEpoch import kafka.network.{BoundedByteBufferSend, RequestChannel} import kafka.common.ErrorMapping @@ -120,9 +120,9 @@ object LeaderAndIsrRequest { } val leadersCount = buffer.getInt - var leaders = Set[BrokerEndpoint]() + var leaders = Set[BrokerEndPoint]() for (i <- 0 until leadersCount) - leaders += BrokerEndpoint.readFrom(buffer) + leaders += BrokerEndPoint.readFrom(buffer) new LeaderAndIsrRequest(versionId, correlationId, clientId, controllerId, controllerEpoch, partitionStateInfos.toMap, leaders) } @@ -134,10 +134,10 @@ case class LeaderAndIsrRequest (versionId: Short, controllerId: Int, controllerEpoch: Int, partitionStateInfos: Map[(String, Int), PartitionStateInfo], - leaders: Set[BrokerEndpoint]) + leaders: Set[BrokerEndPoint]) extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) { - def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[BrokerEndpoint], controllerId: Int, + def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[BrokerEndPoint], controllerId: Int, controllerEpoch: Int, correlationId: Int, clientId: String) = { this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId, controllerId, controllerEpoch, partitionStateInfos, leaders) diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index 6de447d..5e39f45 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -17,7 +17,7 @@ package kafka.api -import kafka.cluster.BrokerEndpoint +import kafka.cluster.BrokerEndPoint import java.nio.ByteBuffer import kafka.api.ApiUtils._ import kafka.utils.Logging @@ -27,7 +27,7 @@ object TopicMetadata { val NoLeaderNodeId = -1 - def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndpoint]): TopicMetadata = { + def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndPoint]): TopicMetadata = { val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) val topic = readShortString(buffer) val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue)) @@ -88,7 +88,7 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat object PartitionMetadata { - def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndpoint]): PartitionMetadata = { + def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndPoint]): PartitionMetadata = { val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */ val leaderId = buffer.getInt @@ -109,9 +109,9 @@ object PartitionMetadata { } case class PartitionMetadata(partitionId: Int, - val leader: Option[BrokerEndpoint], - replicas: Seq[BrokerEndpoint], - isr: Seq[BrokerEndpoint] = Seq.empty, + val leader: Option[BrokerEndPoint], + replicas: Seq[BrokerEndPoint], + isr: Seq[BrokerEndPoint] = Seq.empty, errorCode: Short = ErrorMapping.NoError) extends Logging { def sizeInBytes: Int = { 2 /* error code */ + diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala index 776b604..f2f89e0 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala @@ -17,7 +17,7 @@ package kafka.api -import kafka.cluster.BrokerEndpoint +import kafka.cluster.BrokerEndPoint import java.nio.ByteBuffer object TopicMetadataResponse { @@ -25,7 +25,7 @@ object TopicMetadataResponse { def readFrom(buffer: ByteBuffer): TopicMetadataResponse = { val correlationId = buffer.getInt val brokerCount = buffer.getInt - val brokers = (0 until brokerCount).map(_ => BrokerEndpoint.readFrom(buffer)) + val brokers = (0 until brokerCount).map(_ => BrokerEndPoint.readFrom(buffer)) val brokerMap = brokers.map(b => (b.id, b)).toMap val topicCount = buffer.getInt val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer, brokerMap)) @@ -33,7 +33,7 @@ object TopicMetadataResponse { } } -case class TopicMetadataResponse(brokers: Seq[BrokerEndpoint], +case class TopicMetadataResponse(brokers: Seq[BrokerEndPoint], topicsMetadata: Seq[TopicMetadata], correlationId: Int) extends RequestOrResponse() { diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala index a91e3a6..69f0397 100644 --- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala @@ -19,7 +19,7 @@ package kafka.api import java.nio.ByteBuffer import kafka.api.ApiUtils._ -import kafka.cluster.{Broker, BrokerEndpoint} +import kafka.cluster.{Broker, BrokerEndPoint} import kafka.common.{ErrorMapping, KafkaException, TopicAndPartition} import kafka.network.RequestChannel.Response import kafka.network.{BoundedByteBufferSend, RequestChannel} @@ -53,7 +53,7 @@ object UpdateMetadataRequest { val numAliveBrokers = buffer.getInt val aliveBrokers = versionId match { - case 0 => for(i <- 0 until numAliveBrokers) yield new Broker(BrokerEndpoint.readFrom(buffer),SecurityProtocol.PLAINTEXT) + case 0 => for(i <- 0 until numAliveBrokers) yield new Broker(BrokerEndPoint.readFrom(buffer),SecurityProtocol.PLAINTEXT) case 1 => for(i <- 0 until numAliveBrokers) yield Broker.readFrom(buffer) case v => throw new KafkaException( "Version " + v.toString + " is invalid for UpdateMetadataRequest. Valid versions are 0 or 1.") } diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index f08aaf2..b66424b 100755 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -43,7 +43,7 @@ object ClientUtils extends Logging{ * @param producerConfig The producer's config * @return topic metadata response */ - def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndpoint], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { + def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { var fetchMetaDataSucceeded: Boolean = false var i: Int = 0 val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq) @@ -84,7 +84,7 @@ object ClientUtils extends Logging{ * @param clientId The client's identifier * @return topic metadata response */ - def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndpoint], clientId: String, timeoutMs: Int, + def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], clientId: String, timeoutMs: Int, correlationId: Int = 0): TopicMetadataResponse = { val props = new Properties() props.put("metadata.broker.list", brokers.map(_.connectionString).mkString(",")) @@ -97,11 +97,11 @@ object ClientUtils extends Logging{ /** * Parse a list of broker urls in the form host1:port1, host2:port2, ... */ - def parseBrokerList(brokerListStr: String): Seq[BrokerEndpoint] = { + def parseBrokerList(brokerListStr: String): Seq[BrokerEndPoint] = { val brokersStr = CoreUtils.parseCsvList(brokerListStr) brokersStr.zipWithIndex.map { case (address, brokerId) => - BrokerEndpoint.createBrokerEndPoint(brokerId, address) + BrokerEndPoint.createBrokerEndPoint(brokerId, address) } } @@ -144,7 +144,7 @@ object ClientUtils extends Logging{ while (!offsetManagerChannelOpt.isDefined) { - var coordinatorOpt: Option[BrokerEndpoint] = None + var coordinatorOpt: Option[BrokerEndPoint] = None while (!coordinatorOpt.isDefined) { try { diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 8e603b6..79e16c1 100755 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -110,7 +110,7 @@ case class Broker(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) { this(id, Map(protocol -> EndPoint(host, port, protocol))) } - def this(bep: BrokerEndpoint, protocol: SecurityProtocol) = { + def this(bep: BrokerEndPoint, protocol: SecurityProtocol) = { this(bep.id, bep.host, bep.port, protocol) } @@ -132,10 +132,10 @@ case class Broker(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) { endPoints.contains(protocolType) } - def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndpoint = { + def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndPoint = { val endpoint = endPoints.get(protocolType) endpoint match { - case Some(endpoint) => new BrokerEndpoint(id, endpoint.host, endpoint.port) + case Some(endpoint) => new BrokerEndPoint(id, endpoint.host, endpoint.port) case None => throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id)) } diff --git a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala index 22dba18..3395108 100644 --- a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala +++ b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala @@ -22,24 +22,24 @@ import kafka.api.ApiUtils._ import kafka.common.KafkaException import org.apache.kafka.common.utils.Utils._ -object BrokerEndpoint { - def createBrokerEndPoint(brokerId: Int, connectionString: String): BrokerEndpoint = { +object BrokerEndPoint { + def createBrokerEndPoint(brokerId: Int, connectionString: String): BrokerEndPoint = { // BrokerEndPoint URI is host:port or [ipv6_host]:port // Note that unlike EndPoint (or listener) this URI has no security information. val uriParseExp = """\[?([0-9a-z\-.:]*)\]?:([0-9]+)""".r connectionString match { - case uriParseExp(host, port) => new BrokerEndpoint(brokerId, host, port.toInt) + case uriParseExp(host, port) => new BrokerEndPoint(brokerId, host, port.toInt) case _ => throw new KafkaException("Unable to parse " + connectionString + " to a broker endpoint") } } - def readFrom(buffer: ByteBuffer): BrokerEndpoint = { + def readFrom(buffer: ByteBuffer): BrokerEndPoint = { val brokerId = buffer.getInt() val host = readShortString(buffer) val port = buffer.getInt() - BrokerEndpoint(brokerId, host, port) + BrokerEndPoint(brokerId, host, port) } } @@ -50,7 +50,7 @@ object BrokerEndpoint { * Clients should know which security protocol to use from configuration. * This allows us to keep the wire protocol with the clients unchanged where the protocol is not needed. */ -case class BrokerEndpoint(id: Int, host: String, port: Int) { +case class BrokerEndPoint(id: Int, host: String, port: Int) { def connectionString(): String = formatAddress(host, port) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 6bb0d56..49b683f 100755 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -19,7 +19,7 @@ package kafka.consumer import org.I0Itec.zkclient.ZkClient import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager} -import kafka.cluster.{BrokerEndpoint, Cluster} +import kafka.cluster.{BrokerEndPoint, Cluster} import org.apache.kafka.common.protocol.SecurityProtocol import scala.collection.immutable import collection.mutable.HashMap @@ -53,7 +53,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, private class LeaderFinderThread(name: String) extends ShutdownableThread(name) { // thread responsible for adding the fetcher to the right broker when leader is available override def doWork() { - val leaderForPartitionsMap = new HashMap[TopicAndPartition, BrokerEndpoint] + val leaderForPartitionsMap = new HashMap[TopicAndPartition, BrokerEndPoint] lock.lock() try { while (noLeaderPartitionSet.isEmpty) { @@ -114,7 +114,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, } } - override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndpoint): AbstractFetcherThread = { + override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { new ConsumerFetcherThread( "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id), config, sourceBroker, partitionMap, this) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index cde4481..33ea728 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -17,7 +17,7 @@ package kafka.consumer -import kafka.cluster.BrokerEndpoint +import kafka.cluster.BrokerEndPoint import kafka.server.AbstractFetcherThread import kafka.message.ByteBufferMessageSet import kafka.api.{Request, OffsetRequest, FetchResponsePartitionData} @@ -26,7 +26,7 @@ import kafka.common.TopicAndPartition class ConsumerFetcherThread(name: String, val config: ConsumerConfig, - sourceBroker: BrokerEndpoint, + sourceBroker: BrokerEndPoint, partitionMap: Map[TopicAndPartition, PartitionTopicInfo], val consumerFetcherManager: ConsumerFetcherManager) extends AbstractFetcherThread(name = name, diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala index 9c14428..4345a8e 100644 --- a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala +++ b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala @@ -18,13 +18,13 @@ package kafka.javaapi import java.nio.ByteBuffer -import kafka.cluster.BrokerEndpoint +import kafka.cluster.BrokerEndPoint class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadataResponse) { def errorCode = underlying.errorCode - def coordinator: BrokerEndpoint = { + def coordinator: BrokerEndPoint = { import kafka.javaapi.Implicits._ underlying.coordinatorOpt } diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala index ebbd589..4ef8321 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala @@ -16,7 +16,7 @@ */ package kafka.javaapi -import kafka.cluster.BrokerEndpoint +import kafka.cluster.BrokerEndPoint import scala.collection.JavaConversions private[javaapi] object MetadataListImplicits { @@ -52,17 +52,17 @@ class TopicMetadata(private val underlying: kafka.api.TopicMetadata) { class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) { def partitionId: Int = underlying.partitionId - def leader: BrokerEndpoint = { + def leader: BrokerEndPoint = { import kafka.javaapi.Implicits._ underlying.leader } - def replicas: java.util.List[BrokerEndpoint] = { + def replicas: java.util.List[BrokerEndPoint] = { import JavaConversions._ underlying.replicas } - def isr: java.util.List[BrokerEndpoint] = { + def isr: java.util.List[BrokerEndPoint] = { import JavaConversions._ underlying.isr } diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala index 07feb05..5ad6812 100644 --- a/core/src/main/scala/kafka/producer/ProducerPool.scala +++ b/core/src/main/scala/kafka/producer/ProducerPool.scala @@ -20,7 +20,7 @@ package kafka.producer import java.util.Properties import kafka.api.TopicMetadata -import kafka.cluster.BrokerEndpoint +import kafka.cluster.BrokerEndPoint import kafka.common.UnavailableProducerException import kafka.utils.Logging @@ -31,7 +31,7 @@ object ProducerPool { /** * Used in ProducerPool to initiate a SyncProducer connection with a broker. */ - def createSyncProducer(config: ProducerConfig, broker: BrokerEndpoint): SyncProducer = { + def createSyncProducer(config: ProducerConfig, broker: BrokerEndPoint): SyncProducer = { val props = new Properties() props.put("host", broker.host) props.put("port", broker.port.toString) @@ -45,7 +45,7 @@ class ProducerPool(val config: ProducerConfig) extends Logging { private val lock = new Object() def updateProducer(topicMetadata: Seq[TopicMetadata]) { - val newBrokers = new collection.mutable.HashSet[BrokerEndpoint] + val newBrokers = new collection.mutable.HashSet[BrokerEndPoint] topicMetadata.foreach(tmd => { tmd.partitionsMetadata.foreach(pmd => { if(pmd.leader.isDefined) { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index f8f9331..ec40516 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import scala.collection.Set import scala.collection.Map import kafka.utils.Logging -import kafka.cluster.BrokerEndpoint +import kafka.cluster.BrokerEndPoint import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition import com.yammer.metrics.core.Gauge @@ -69,7 +69,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri } // to be defined in subclass to create a specific fetcher - def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndpoint): AbstractFetcherThread + def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) { mapLock synchronized { @@ -127,6 +127,6 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri } } -case class BrokerAndFetcherId(broker: BrokerEndpoint, fetcherId: Int) +case class BrokerAndFetcherId(broker: BrokerEndPoint, fetcherId: Int) -case class BrokerAndInitialOffset(broker: BrokerEndpoint, initOffset: Long) +case class BrokerAndInitialOffset(broker: BrokerEndPoint, initOffset: Long) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 1e26de2..f178527 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.cluster.BrokerEndpoint +import kafka.cluster.BrokerEndPoint import kafka.utils.{Pool, ShutdownableThread} import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} @@ -37,7 +37,7 @@ import com.yammer.metrics.core.Gauge * Abstract class for fetching data from multiple partitions from the same broker. */ -abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: BrokerEndpoint, socketTimeout: Int, socketBufferSize: Int, +abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: BrokerEndPoint, socketTimeout: Int, socketBufferSize: Int, fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, fetchBackOffMs: Int = 0, isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) { diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 4460b42..9a9205f 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.cluster.{BrokerEndpoint,Broker} +import kafka.cluster.{BrokerEndPoint,Broker} import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException} import kafka.common.TopicAndPartition @@ -54,10 +54,10 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { val partitionMetadata = partitionStateInfos.map { case (partitionId, partitionState) => val replicas = partitionState.allReplicas - val replicaInfo: Seq[BrokerEndpoint] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq.map(_.getBrokerEndPoint(protocol)) - var leaderInfo: Option[BrokerEndpoint] = None + val replicaInfo: Seq[BrokerEndPoint] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq.map(_.getBrokerEndPoint(protocol)) + var leaderInfo: Option[BrokerEndPoint] = None var leaderBrokerInfo: Option[Broker] = None - var isrInfo: Seq[BrokerEndpoint] = Nil + var isrInfo: Seq[BrokerEndPoint] = Nil val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch val leader = leaderIsrAndEpoch.leaderAndIsr.leader val isr = leaderIsrAndEpoch.leaderAndIsr.isr diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala index f0a2a5b..ef38ed3 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala @@ -17,13 +17,13 @@ package kafka.server -import kafka.cluster.BrokerEndpoint +import kafka.cluster.BrokerEndPoint class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager) extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, "Replica", brokerConfig.numReplicaFetchers) { - override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndpoint): AbstractFetcherThread = { + override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr) } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index b2196c8..2d84afa 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -18,14 +18,14 @@ package kafka.server import kafka.admin.AdminUtils -import kafka.cluster.BrokerEndpoint +import kafka.cluster.BrokerEndPoint import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet import kafka.api.{OffsetRequest, FetchResponsePartitionData} import kafka.common.{KafkaStorageException, TopicAndPartition} class ReplicaFetcherThread(name:String, - sourceBroker: BrokerEndpoint, + sourceBroker: BrokerEndPoint, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager) extends AbstractFetcherThread(name = name, diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 144a15e..b06f00b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -19,7 +19,7 @@ package kafka.server import kafka.api._ import kafka.common._ import kafka.utils._ -import kafka.cluster.{BrokerEndpoint, Partition, Replica} +import kafka.cluster.{BrokerEndPoint, Partition, Replica} import kafka.log.{LogAppendInfo, LogManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController @@ -684,7 +684,7 @@ class ReplicaManager(val config: KafkaConfig, * the error message will be set on each partition since we do not know which partition caused it */ private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], - leaders: Set[BrokerEndpoint], correlationId: Int, responseMap: mutable.Map[(String, Int), Short], + leaders: Set[BrokerEndPoint], correlationId: Int, responseMap: mutable.Map[(String, Int), Short], offsetManager: OffsetManager) { partitionState.foreach { state => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index d1050b4..1366172 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -18,7 +18,7 @@ package kafka.tools import joptsimple.OptionParser -import kafka.cluster.BrokerEndpoint +import kafka.cluster.BrokerEndPoint import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet} import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference @@ -197,7 +197,7 @@ private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, c private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int], leadersPerBroker: Map[Int, Seq[TopicAndPartition]], expectedNumFetchers: Int, - brokerMap: Map[Int, BrokerEndpoint], + brokerMap: Map[Int, BrokerEndPoint], initialOffsetTime: Long, reportInterval: Long) extends Logging { private val fetchOffsetMap = new Pool[TopicAndPartition, Long] @@ -335,7 +335,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } } -private class ReplicaFetcher(name: String, sourceBroker: BrokerEndpoint, topicAndPartitions: Iterable[TopicAndPartition], +private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAndPartitions: Iterable[TopicAndPartition], replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int, fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean) extends ShutdownableThread(name) { diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 9a6804c..dec9516 100755 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -22,7 +22,7 @@ import kafka.utils._ import kafka.consumer._ import kafka.client.ClientUtils import kafka.api.{OffsetRequest, FetchRequestBuilder, Request} -import kafka.cluster.BrokerEndpoint +import kafka.cluster.BrokerEndPoint import scala.collection.JavaConversions._ import kafka.common.TopicAndPartition import org.apache.kafka.common.utils.Utils @@ -143,8 +143,8 @@ object SimpleConsumerShell extends Logging { } // validating replica id and initializing target broker - var fetchTargetBroker: BrokerEndpoint = null - var replicaOpt: Option[BrokerEndpoint] = null + var fetchTargetBroker: BrokerEndPoint = null + var replicaOpt: Option[BrokerEndPoint] = null if(replicaId == UseLeaderReplica) { replicaOpt = partitionMetadataOpt.get.leader if(!replicaOpt.isDefined) { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 82b0e33..b03172a 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -83,7 +83,7 @@ object ZkUtils extends Logging { brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) } - def getAllBrokerEndPointsForChannel(zkClient: ZkClient, protocolType: SecurityProtocol): Seq[BrokerEndpoint] = { + def getAllBrokerEndPointsForChannel(zkClient: ZkClient, protocolType: SecurityProtocol): Seq[BrokerEndPoint] = { getAllBrokersInCluster(zkClient).map(_.getBrokerEndPoint(protocolType)) } diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 83910f3..566b538 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -18,7 +18,7 @@ package kafka.api -import kafka.cluster.{BrokerEndpoint, EndPoint, Broker} +import kafka.cluster.{BrokerEndPoint, EndPoint, Broker} import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError} import kafka.common._ import kafka.message.{Message, ByteBufferMessageSet} @@ -122,7 +122,7 @@ object SerializationTestUtils { val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1) val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, isr1.toSet)), ((topic2, 0), PartitionStateInfo(leaderAndIsr2, isr2.toSet))) - new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[BrokerEndpoint](), 0, 1, 0, "") + new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[BrokerEndPoint](), 0, 1, 0, "") } def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = { diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala index ad58eed..bb2506c 100644 --- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala @@ -91,12 +91,12 @@ class BrokerEndPointTest extends JUnit3Suite with Logging { @Test def testBrokerEndpointFromURI() = { var connectionString = "localhost:9092" - var endpoint = BrokerEndpoint.createBrokerEndPoint(1, connectionString) + var endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString) assert(endpoint.host == "localhost") assert(endpoint.port == 9092) // also test for ipv6 connectionString = "[::1]:9092" - endpoint = BrokerEndpoint.createBrokerEndPoint(1, connectionString) + endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString) assert(endpoint.host == "::1") assert(endpoint.port == 9092) } diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 603cf76..995b059 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -23,7 +23,7 @@ import kafka.zk.ZooKeeperTestHarness import kafka.admin.AdminUtils import java.nio.ByteBuffer import junit.framework.Assert._ -import kafka.cluster.{BrokerEndpoint, Broker} +import kafka.cluster.{BrokerEndPoint, Broker} import kafka.utils.TestUtils import kafka.utils.TestUtils._ import kafka.server.{KafkaServer, KafkaConfig} @@ -33,7 +33,7 @@ import kafka.client.ClientUtils class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { private var server1: KafkaServer = null - var brokerEndPoints: Seq[BrokerEndpoint] = null + var brokerEndPoints: Seq[BrokerEndPoint] = null override def setUp() { super.setUp() diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 2169a5c..be4bb87 100755 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -23,7 +23,7 @@ import junit.framework.Assert._ import org.easymock.EasyMock import org.junit.Test import kafka.api._ -import kafka.cluster.{BrokerEndpoint, Broker} +import kafka.cluster.{BrokerEndPoint, Broker} import kafka.common._ import kafka.message._ import kafka.producer.async._ @@ -165,8 +165,8 @@ class AsyncProducerTest extends JUnit3Suite { val props = new Properties() props.put("metadata.broker.list", brokerList) - val broker1 = new BrokerEndpoint(0, "localhost", 9092) - val broker2 = new BrokerEndpoint(1, "localhost", 9093) + val broker1 = new BrokerEndPoint(0, "localhost", 9092) + val broker2 = new BrokerEndPoint(1, "localhost", 9093) // form expected partitions metadata val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2)) @@ -469,7 +469,7 @@ class AsyncProducerTest extends JUnit3Suite { } private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = { - val broker1 = new BrokerEndpoint(brokerId, brokerHost, brokerPort) + val broker1 = new BrokerEndPoint(brokerId, brokerHost, brokerPort) new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1)))) } -- 1.9.5 (Apple Git-50.3)