diff --git a/core/src/main/scala/kafka/network/Handler.scala b/core/src/main/scala/kafka/network/Handler.scala index a030033..8d5ebee 100644 --- a/core/src/main/scala/kafka/network/Handler.scala +++ b/core/src/main/scala/kafka/network/Handler.scala @@ -17,6 +17,8 @@ package kafka.network +import java.nio.channels.SelectionKey + private[kafka] object Handler { /** @@ -24,6 +26,11 @@ private[kafka] object Handler { * transmission into an outgoing transmission */ type Handler = Receive => Option[Send] + + /** + * A coordinator request handler, which requires more metadata than Handler + */ + type CoordinatorHandler = (Receive, Processor, SelectionKey) => Option[Send] /** * A handler mapping finds the right Handler function for a given request diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 1bc6bc1..9180813 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -27,6 +27,7 @@ import kafka.utils._ import org.apache.log4j.Logger import kafka.api.RequestKeys +import kafka.server.{RegistryReplySend, KafkaRequestHandlers} /** * An NIO socket server. The thread model is @@ -36,14 +37,14 @@ import kafka.api.RequestKeys class SocketServer(val port: Int, val numProcessorThreads: Int, monitoringPeriodSecs: Int, - private val handlerFactory: Handler.HandlerMapping, + private val handlerFactory: KafkaRequestHandlers, val sendBufferSize: Int, val receiveBufferSize: Int, val maxRequestSize: Int = Int.MaxValue) { private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) - private var acceptor: Acceptor = new Acceptor(port, processors, sendBufferSize, receiveBufferSize) + private val acceptor: Acceptor = new Acceptor(port, processors, sendBufferSize, receiveBufferSize) val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs) /** @@ -51,7 +52,7 @@ class SocketServer(val port: Int, */ def startup() { for(i <- 0 until numProcessorThreads) { - processors(i) = new Processor(handlerFactory, time, stats, maxRequestSize) + processors(i) = new Processor(i, handlerFactory, time, stats, maxRequestSize) Utils.newThread("kafka-processor-" + i, processors(i), false).start() } Utils.newThread("kafka-acceptor", acceptor, false).start() @@ -186,7 +187,8 @@ private[kafka] class Acceptor(val port: Int, private val processors: Array[Proce * Thread that processes all requests from a single connection. There are N of these running in parallel * each of which has its own selectors */ -private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping, +private[kafka] class Processor(val id: Int, + val handlerMapping: KafkaRequestHandlers, val time: Time, val stats: SocketServerStats, val maxRequestSize: Int) extends AbstractServerThread { @@ -286,14 +288,36 @@ private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping, requestLogger.trace("Handling multi-produce request from " + channelFor(key).socket.getRemoteSocketAddress()) case RequestKeys.Offsets => requestLogger.trace("Handling offset request from " + channelFor(key).socket.getRemoteSocketAddress()) + case RequestKeys.ClusterMetadata => + requestLogger.trace("Handling cluster metadata request from " + channelFor(key).socket.getRemoteSocketAddress()) + case RequestKeys.RegisterConsumer => + requestLogger.trace("Handling register consumer request from " + channelFor(key).socket.getRemoteSocketAddress()) case _ => throw new InvalidRequestException("No mapping found for handler id " + requestTypeId) } } - val handler = handlerMapping(requestTypeId, request) - if(handler == null) - throw new InvalidRequestException("No handler found for request") - val start = time.nanoseconds - val maybeSend = handler(request) + + // Here we have to hack the socket server a little bit since the request + // does not have the key information for the coordinator to store the map + // + // In 0.8 socket server this is unnecessary since the key info is included + // in the request + var start: Long = -1 + var maybeSend: Option[Send] = null + if (RequestKeys.isCoordinatorRequest(requestTypeId)) { + val handler = handlerMapping.coordinatorHandleFor(requestTypeId, request) + if(handler == null) + throw new InvalidRequestException("No handler found for request") + start = time.nanoseconds + maybeSend = handler(request, this, key) + } + else { + val handler = handlerMapping.handlerFor(requestTypeId, request) + if(handler == null) + throw new InvalidRequestException("No handler found for request") + start = time.nanoseconds + maybeSend = handler(request) + } + stats.recordRequest(requestTypeId, time.nanoseconds - start) maybeSend } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index af238d6..2894b49 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -87,4 +87,19 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) { /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */ val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", "")) + + /* max number of retries during consumer rebalance */ + val maxRebalanceRetries = Utils.getInt(props, "rebalance.retries.max", 4) + + /* backoff time between retries during rebalance */ + val rebalanceBackoffMs = Utils.getInt(props, "rebalance.backoff.ms", zkSyncTimeMs) + + /* rebalancing threadpool size */ + val numRebalanceThreads = Utils.getIntInRange(props, "num.rebalance.threads", 1, (1, Int.MaxValue)) + + /* minimum viable value for consumer session timeout */ + val minCoordinatorSessionTimeoutMs = Utils.getInt(props, "min.coordinator.session.timeout.ms", 2000) + + /* maximum number of failed requests to consumers */ + val maxNumRequestFailures = Utils.getInt(props, "max.num.request.failures", 4) } diff --git a/core/src/main/scala/kafka/server/KafkaConsumerCoordinator.scala b/core/src/main/scala/kafka/server/KafkaConsumerCoordinator.scala new file mode 100644 index 0000000..f454205 --- /dev/null +++ b/core/src/main/scala/kafka/server/KafkaConsumerCoordinator.scala @@ -0,0 +1,463 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + + +import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} +import org.apache.zookeeper.Watcher.Event.KeeperState +import java.util.concurrent.atomic.AtomicBoolean +import collection.mutable.{Set, Map} +import kafka.utils._ +import kafka.common._ +import java.util.concurrent.LinkedBlockingQueue +import scala.collection.JavaConversions._ +import kafka.api.{PingRequest, ConsumerRegitry} + + +class KafkaConsumerCoordinator(val zkClient: ZkClient, val brokerId: Int, val config: KafkaConfig) extends Logging { + + /** + * On startup: + * + * 1. Read the consumer group info, register child change watch on /consumers/[group]/ids for each group, and /consumers for group changes. + * + * 2. Register child change watch for /brokers/topics/[topic] for each topic. + * + * 3. Register a listener for session expiration events. + * + * 4. For each group, keep an atomic boolean specifying if the group is under rebalancing. + */ + private val lock = new Object + + private val elector = new ZkElection(zkClient, ZkUtils.ConsumerCoordinatorPath, becomeConsumerLeader, brokerId.toString) + + val scheduler = new KafkaScheduler(1, "Kafka-ping-request-", false) + + val consumerRegistries = Map[String, ConsumerRegitry]() + + val consumersPerGroup = Map[String, Set[String]]() + + val groupsBeingRebalanced = Map[String, AtomicBoolean]() + + val consumerGroupsPerTopic = Map[String, Set[String]]() + + val groupsWithWildcardTopics = Set[String]() + + val rebalanceRequestQ = new LinkedBlockingQueue[String](1000) + + val pingRequestPurgatory = new PingRequestPurgatory(this) + + var requestHandler: RebalanceRequestHandler = null + + var requestHandlers: Array[RebalanceRequestHandler] = null + + def amCurrentCoordinator = elector.amLeader + + def currentCoordinatorId = elector.leaderId.toInt + + def becomeConsumerLeader(id: String) { + // Check it is really me who become the coordinator + if (id.toInt != brokerId) + throw new UnexpectedNameIdException("The coordinator startup logic is called for a different server") + + startup + } + + def startup = { + + // Add the topic to as the candidate of some groups' interest + val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath) + + if (topics == Nil) { + zkClient.createPersistent(ZkUtils.BrokerTopicsPath, true) + } + else { + for (topic <- topics) consumerGroupsPerTopic += (topic -> Set.empty[String]) + } + + // Register the listeners for brokers and their partitions + registerTopicChangeListener + registerTopicPartitionChangeListener + + // Register the listeners for consumer gourps and consumers + //registerGroupChangeListener + //registerGroupMemeberChangeListener + + // Register session expiration listener + registerSessionExpirationListener + + // Start processing rebalance requests + requestHandler = new RebalanceRequestHandler(zkClient, this, config) + //requestHandler.start + + requestHandlers = new Array[RebalanceRequestHandler](config.numRebalanceThreads) + for(i <- 0 until config.numRebalanceThreads) { + requestHandlers(i) = new RebalanceRequestHandler(zkClient, this, config) + //requestHandlers(i).start + } + } + + + + def shutdown() { + // First resign the elector to let others know + // so that they can run re-election + elector.close + + // Then unsubscribe all the watchers before close the zkClient + zkClient.unsubscribeAll() + + // If the requestHandler is once created, shut it down + if (requestHandler != null) + requestHandler.shutdown + + // Then reset the state by clearing the cache + reset + + // Catch and ignore interruption exception while closing zkClient + zkClient.close() + + info("Rebalance request handler for broker %s shutdown completed".format(brokerId)) + } + + def reset = { + groupsBeingRebalanced.clear + consumerGroupsPerTopic.clear + groupsWithWildcardTopics.clear + } + + + def autoPing(consumer: String) { + trace("auto pinging consumer " + consumer) + try { + pingConsumer(consumer) + } + catch { + case t: Throwable => + // log it and let it go + error("exception during autoPing: ", t) + } + } + + def pingConsumer(consumer: String) { + + // Send the ping request to the consumer + val request = new PingRequest(consumer, consumerRegistries(consumer).numFailedRequests) + + val delayMs = consumerRegistries(consumer).sessionTimeoutMs * 2/3 + + // Set the watcher for the ping request + val delayedPing = new DelayedPing(List[String](consumer), request, delayMs) + pingRequestPurgatory.watch(delayedPing) + } + + def requestExpired(consumer: String) { + consumerRegistries(consumer).numFailedRequests += 1 + + // TODO: Check if the number of expiration has reached the limit + } + + // Watch for partition changes on the brokers + private def registerTopicPartitionChangeListener() = { + val topics = zkClient.getChildren(ZkUtils.BrokerTopicsPath) + for (topic <- topics) { + zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic, new TopicPartitionChangeListener) + } + } + + // Watch for topic changes + private def registerTopicChangeListener() = { + zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, new TopicChangeListener) + } + + + // Watch for group member changes + private def registerGroupMemeberChangeListener() = { + val groups = zkClient.getChildren(ZkUtils.ConsumerGroupsPath) + for (group <- groups) { + zkClient.subscribeChildChanges(ZkUtils.ConsumerGroupsPath + "/" + group + "/ids", new GroupMemberChangeListener) + } + } + + def handleNewGroup(group: String) = { + /* TODO + 1. Read all the topics this group is interested in, and for each topic: + + 1.1. If the topic already exists in coordinator's memory, update its list by adding this group + + 1.2. Otherwise add this topic with the new group as its only interested group + + 2. If the interested topics are in the wildcard format, add this group to the list of wildcard-interested groups + + 3.1. If the group already has some interested existed topics, create its rebalance bit initialized as true, and try to rebalance the group + + 3.2. Otherwise just create its rebalance bit initialized as false + */ + } + + // Watch for group changes + private def registerGroupChangeListener() = { + zkClient.subscribeChildChanges(ZkUtils.ConsumerGroupsPath, new GroupChangeListener) + } + + def handleAddedGroupMember(consumer: String) = { + /* TODO + 1. If the rebalance bit for the group that the consumer belongs to is not set, set it and try to rebalance the group + */ + } + + // Watch for session timeout + private def registerSessionExpirationListener() = { + zkClient.subscribeStateChanges(new ConsumerCoordinatorSessionExpireListener) + } + + + class ConsumerCoordinatorSessionExpireListener extends IZkStateListener { + + @throws(classOf[Exception]) + def handleStateChanged(state: KeeperState) { + // do nothing, since zkclient will do reconnect for us. + } + + /** + * Called after the zookeeper session has expired and a new session has been created. Try to elect as the leader + * + * @throws Exception + * On any error. + */ + @throws(classOf[Exception]) + def handleNewSession() { + /** + * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a + * connection for us. + */ + lock synchronized { + info(brokerId + " ZK expired; check if I am the coordinator") + + // First reset myself, then try to re-elect + // Note that do not need to unsubscribe all listeners, since they are already lost with the new session + reset + + // The state listener in ZkClient does not re-register itself once fired, + // so we have to re-subscribe this listener manually + registerSessionExpirationListener + + if (elector.elect) startup + } + } + } + + class GroupChangeListener extends IZkChildListener with Logging { + + /** + * Called when a new consumer group is registered; + * an existing consumer group should not be deleted since the group nodes are persistent + * + * @throws Exception + * On any error. + */ + @throws(classOf[Exception]) + def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { + import scala.collection.JavaConversions._ + + lock synchronized { + info("%s GroupChange listener fired for path %s with children %s" + .format(brokerId, parentPath, curChilds)) + + // Update in-memory data and subscribe watchers with newly registered groups + val addedGroups = curChilds filterNot (groupsBeingRebalanced.keySet contains) + + if (addedGroups.nonEmpty) + info("Group event: added groups = %s".format(addedGroups)) + + for (group <- addedGroups) { + // Watch for group member changes + zkClient.subscribeChildChanges(ZkUtils.ConsumerGroupsPath + "/" + group + "/ids", + new GroupMemberChangeListener) + + // Update consumerGroupsPerTopic + val (topics, wildcard) = ZkUtils.getTopicsForGroupMaybeWildcard(zkClient, group) + topics.map { + topic => + // Remember the group's interested topics + if (!consumerGroupsPerTopic.contains(topic)) consumerGroupsPerTopic += (topic -> Set(group)) + else consumerGroupsPerTopic(topic).add(group) + } + + // Update groupsWithWildcardTopics + if (wildcard) { + groupsWithWildcardTopics += group + info("Record group %s as with wildcard topic interests".format(group)) + } + + // If the group already has some consumers with some interested + // topics registered, immediately request rebalance + if (!topics.isEmpty) { + groupsBeingRebalanced += (group -> new AtomicBoolean(true)) + rebalanceRequestQ.put(group) + info("Put the rebalance request for newly added group %s since it already has some interested topics %s".format(group, topics)) + } + else + groupsBeingRebalanced += (group -> new AtomicBoolean(false)) + } + + // Since groups are persistent path, they should never be deleted + val deletedGroups = groupsBeingRebalanced.keySet filterNot (curChilds contains) + + if (deletedGroups.nonEmpty) + throw new UnexpectedZookeeperASyncException("Groups %s are deleted, which should not happen" + .format(deletedGroups)) + } + } + } + + class TopicChangeListener extends IZkChildListener with Logging { + + /** + * Called when a new topic group is registered; + * an existing topic should not be deleted since the group nodes are persistent + * + * @throws Exception + * On any error. + */ + @throws(classOf[Exception]) + def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { + import scala.collection.JavaConversions._ + + lock synchronized { + info("%s TopicChange listener fired for path %s with children %s" + .format(brokerId, parentPath, curChilds)) + + // Update in-memory data and subscribe watchers with newly registered groups + val addedTopics = curChilds filterNot (consumerGroupsPerTopic.keySet contains) + + if (addedTopics.nonEmpty) + info("Topic event: added topics = %s".format(addedTopics)) + + for (topic <- addedTopics) { + // Watch for topic partition changes + zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic, new TopicPartitionChangeListener) + + // Get groups with static interests that contain this topic + var staticGroups = Set[String]() + + if (!consumerGroupsPerTopic.contains(topic)) consumerGroupsPerTopic += (topic -> Set.empty[String]) + else staticGroups = consumerGroupsPerTopic(topic) + + // Get groups with wildcard interests that match this topic + var wildcardGroups = Set[String]() + + groupsWithWildcardTopics.foreach ( + group => + if (ZkUtils.getTopicsForGroup(zkClient, group) contains topic) wildcardGroups += group + ) + + for (group <- (staticGroups | wildcardGroups)) { + if (groupsBeingRebalanced(group).compareAndSet(false, true)) { + rebalanceRequestQ.put(group) + info("Put the rebalance request for group %s due to added topic %s of interest".format(group, topic)) + } + } + } + + // Since topics are persistent path, they should never be deleted + val deletedTopics = consumerGroupsPerTopic.keySet filterNot (curChilds contains) + + if (deletedTopics.nonEmpty) + throw new UnexpectedZookeeperASyncException("Topics %s are deleted, which should not happen" + .format(deletedTopics)) + } + } + } + + class GroupMemberChangeListener extends IZkChildListener with Logging { + + /** + * Called when the group member information stored in zookeeper has changed. + * Try to request a rebalance for the group by setting the bit in groupsBeingRebalanced + * + * @throws Exception + * On any error. + */ + @throws(classOf[Exception]) + def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { + lock synchronized { + info("%s GroupMemberChange listener fired for path %s with children %s" + .format(brokerId, parentPath, curChilds)) + + val group = parentPath.split("/").init.last + + info("Group %s has children changed".format(group)) + + // Update consumerGroupsPerTopic + val interestedTopics = ZkUtils.getTopicsForGroup(zkClient, group) + val notInterestedTopics = consumerGroupsPerTopic.keySet filterNot(interestedTopics contains) + + for (topic <- notInterestedTopics) { + if (consumerGroupsPerTopic(topic).contains(group)) consumerGroupsPerTopic(topic).remove(group) + } + + // If the group is already empty, no need to rebalance any more + val consumers = curChilds + + if (!consumers.isEmpty && groupsBeingRebalanced(group).compareAndSet(false, true)) { + rebalanceRequestQ.put(group) + info("Put the rebalance request for group %s due to group member change".format(group)) + } + } + } + } + + class TopicPartitionChangeListener extends IZkChildListener with Logging { + + /** + * Called when the topic's partition information stored in zookeeper has changed. + * Try to request a rebalance for each group interested in the changed topic + * by setting the bit in groupsBeingRebalanced + * + * @throws Exception + * On any error. + */ + @throws(classOf[Exception]) + def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { + lock synchronized { + info("%s TopicPartitionChange listener fired for path %s with children %s". + format(brokerId, parentPath, curChilds)) + + val topic = parentPath.split("/").last + + // Get groups with static interests that contain this topic + val staticGroups = consumerGroupsPerTopic(topic) + + // Get groups with wildcard interests that match this topic + var wildcardGroups = Set[String]() + + groupsWithWildcardTopics.foreach ( + group => + if (ZkUtils.getTopicsForGroup(zkClient, group) contains topic) wildcardGroups += group + ) + + for (group <- (staticGroups | wildcardGroups)) { + if (groupsBeingRebalanced(group).compareAndSet(false, true)) { + rebalanceRequestQ.put(group) + info("Put the rebalance request for group %s due to topic %s partition change".format(group, topic)) + } + } + } + } + } +} diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala b/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala index 26b1aaa..b54f390 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala @@ -25,11 +25,16 @@ import kafka.api._ import kafka.common.ErrorMapping import java.util.concurrent.atomic.AtomicLong import kafka.utils._ +import kafka.cluster.Cluster +import java.nio.channels.SelectionKey +import collection.mutable.Set + /** * Logic to handle the various Kafka requests */ -private[kafka] class KafkaRequestHandlers(val logManager: LogManager) extends Logging { +private[kafka] class KafkaRequestHandlers(val logManager: LogManager, + val consumerCoordinator: KafkaConsumerCoordinator) extends Logging { private val requestLogger = Logger.getLogger("kafka.request.logger") @@ -40,9 +45,81 @@ private[kafka] class KafkaRequestHandlers(val logManager: LogManager) extends Lo case RequestKeys.MultiFetch => handleMultiFetchRequest _ case RequestKeys.MultiProduce => handleMultiProducerRequest _ case RequestKeys.Offsets => handleOffsetRequest _ + case RequestKeys.ClusterMetadata => handlerClusterMetadataRequest _ case _ => throw new IllegalStateException("No mapping found for handler id " + requestTypeId) } } + + def coordinatorHandleFor(requestTypeId: Short, request: Receive): Handler.CoordinatorHandler = { + if (!consumerCoordinator.amCurrentCoordinator) + throw new IllegalStateException("A coordinator request with id " + requestTypeId + " is received by a non-coordinator") + + requestTypeId match { + case RequestKeys.RegisterConsumer => handleRegisterConsumerRequest _ + case _ => throw new IllegalStateException("No mapping found for coordinator handler id " + requestTypeId) + } + } + + def handleRegisterConsumerRequest(receive: Receive, processor: Processor, key: SelectionKey): Option[Send] = { + val request = RegisterConsumerRequest.readFrom(receive.buffer) + + // If the session timeout is less than the minimum viable value, reject it + if (consumerCoordinator.config.minCoordinatorSessionTimeoutMs < request.sessionTimeoutMs) + return Some(new RegistryReplySend(request, ErrorMapping.RequestRejectCode)) + + // If the consumer is already registered, reject it + if (consumerCoordinator.consumerRegistries.contains(request.consumerId)) + return Some(new RegistryReplySend(request, ErrorMapping.RequestRejectCode)) + + // Otherwise accept it + val consumerRegistry = new ConsumerRegitry(request, processor.id, key) + + // If a new group is detected, handle the newly added group + if (!consumerCoordinator.consumersPerGroup.contains(consumerRegistry.groupId)) { + consumerCoordinator.consumersPerGroup.put(consumerRegistry.groupId, + Set[String](consumerRegistry.consumerId)) + + consumerCoordinator.handleNewGroup(consumerRegistry.groupId) + } + else { + consumerCoordinator.consumersPerGroup(consumerRegistry.groupId) += consumerRegistry.consumerId + } + + // Record the registry entry to coordinator + consumerCoordinator.consumerRegistries.put(request.consumerId, consumerRegistry) + + // For the newly added consumer to a known group, handle the added consumer + consumerCoordinator.handleAddedGroupMember(request.consumerId) + + // Add the ping task to the scheduler + val pingCommand = new Runnable() { + def run() = { + try { + consumerCoordinator.autoPing(request.consumerId) + } + catch { + case t => + // log any error and the stack trace + error("error in pingCommand runnable", t) + } + } + } + consumerCoordinator.scheduler.scheduleParamterizedWithRate(pingCommand, request.sessionTimeoutMs * 1/3, request.sessionTimeoutMs * 1/3) + + Some(new RegistryReplySend(request, ErrorMapping.NoError)) + } + + def handlerClusterMetadataRequest(receive: Receive): Option[Send] = { + // If the server is not completely started so the coordinator does not exist yet, + // return the server_not_ready error code + if (consumerCoordinator == null) + return Some(new ClusterInfoSend(new Cluster(), -1, ErrorMapping.ServerNotReadyCode)) + + val cluster = ZkUtils.getCluster(consumerCoordinator.zkClient) + val coordinatorId = consumerCoordinator.currentCoordinatorId + + Some(new ClusterInfoSend(cluster, coordinatorId)) + } def handleProducerRequest(receive: Receive): Option[Send] = { val sTime = SystemTime.milliseconds diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b2abb08..09bbed0 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -20,9 +20,14 @@ package kafka.server import kafka.log.LogManager import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean -import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler, Logging} import kafka.network.{SocketServerStats, SocketServer} import java.io.File +import org.I0Itec.zkclient.ZkClient +import kafka.utils._ +import kafka.common.UnexpectedNameIdException +import org.I0Itec.zkclient.exception.{ZkInterruptedException, ZkNoNodeException} +import org.apache.zookeeper.KeeperException +import org.apache.zookeeper.KeeperException.NoNodeException /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required @@ -37,10 +42,12 @@ class KafkaServer(val config: KafkaConfig) extends Logging { var socketServer: SocketServer = null - val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false) + var scheduler: KafkaScheduler = null private var logManager: LogManager = null + var consumerCoordinator: KafkaConsumerCoordinator = null + /** * Start up API for bringing up a single instance of the Kafka server. * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers @@ -55,18 +62,24 @@ class KafkaServer(val config: KafkaConfig) extends Logging { needRecovery = false cleanShutDownFile.delete } + scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false) logManager = new LogManager(config, scheduler, SystemTime, 1000L * 60 * config.logCleanupIntervalMinutes, 1000L * 60 * 60 * config.logRetentionHours, needRecovery) + + val coordinatorZkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, + config.zkConnectionTimeoutMs, ZKStringSerializer) + + consumerCoordinator = new KafkaConsumerCoordinator(coordinatorZkClient, config.brokerId, config) - val handlers = new KafkaRequestHandlers(logManager) + val handlers = new KafkaRequestHandlers(logManager, consumerCoordinator) socketServer = new SocketServer(config.port, config.numThreads, config.monitoringPeriodSecs, - handlers.handlerFor, + handlers, config.socketSendBuffer, config.socketReceiveBuffer, config.maxSocketRequestSize) @@ -78,6 +91,7 @@ class KafkaServer(val config: KafkaConfig) extends Logging { * So this should happen after socket server start. */ logManager.startup() + info("Kafka server started.") } @@ -93,9 +107,24 @@ class KafkaServer(val config: KafkaConfig) extends Logging { if (socketServer != null) socketServer.shutdown() Utils.unregisterMBean(statsMBeanName) + + // We need to first close the coordinator's zkClient then the broker's zkClient + // TODO: need to throw Zk exceptions during shutdown due to inconsistent Zookeeper views + if (consumerCoordinator != null) { + try { + consumerCoordinator.shutdown + } + catch { + case _: NoNodeException => + case _: ZkNoNodeException => + case _: NoSuchElementException => + case e => throw e + } + } if (logManager != null) logManager.close() + val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE) cleanShutDownFile.createNewFile diff --git a/core/src/main/scala/kafka/server/RegistryReplySend.scala b/core/src/main/scala/kafka/server/RegistryReplySend.scala new file mode 100644 index 0000000..6ed2dd4 --- /dev/null +++ b/core/src/main/scala/kafka/server/RegistryReplySend.scala @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.nio._ +import java.nio.channels._ +import kafka.network._ +import kafka.utils._ +import kafka.common.ErrorMapping +import kafka.cluster.Cluster +import scala.Array +import kafka.api.RegisterConsumerRequest + +/** + * A zero-copy message response that writes the bytes needed directly from the file + * wholly in kernel space + */ +@nonthreadsafe +class RegistryReplySend(val registryRequest: RegisterConsumerRequest, val errorCode: Int) extends Send { + + private val header = ByteBuffer.allocate(6) + + header.putInt(2 /* error code */) + header.putShort(errorCode.asInstanceOf[Short]) + header.rewind() + + var complete: Boolean = false + + def writeTo(channel: GatheringByteChannel): Int = { + expectIncomplete() + val written = channel.write(header) + // if we are done, mark it off + if(!header.hasRemaining) + complete = true + written + } + +} \ No newline at end of file