diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 1bf7d10..0ee4438 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -33,6 +33,7 @@ import kafka.common.ErrorMapping import kafka.network.{Receive, BlockingChannel, SocketServer} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge +import org.apache.kafka.common.KafkaException /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required @@ -84,6 +85,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg /* start log manager */ logManager = createLogManager(zkClient, brokerState) logManager.startup() + + verifyUniqueHostPort(config, zkClient) socketServer = new SocketServer(config.brokerId, config.hostName, @@ -358,5 +361,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler) } + private def verifyUniqueHostPort(config: KafkaConfig, zkClient: ZkClient): Unit = { + val hostToCheck = + if(config.hostName == null || config.hostName.trim.isEmpty) + "localhost" + else + config.hostName + + val portToCheck = config.port + val allBrokers = ZkUtils.getAllBrokersInCluster(zkClient) + + for(b <- allBrokers) { + if(b.host == hostToCheck && b.port == portToCheck) { + throw new KafkaException("Host/port combination %s:%d is in use by an existing broker".format(hostToCheck, portToCheck)) + } + } + } }