diff --git a/core/src/main/scala/kafka/common/NoTopicsForConsumersException.scala b/core/src/main/scala/kafka/common/NoTopicsForConsumersException.scala deleted file mode 100644 index 28a231c..0000000 --- a/core/src/main/scala/kafka/common/NoTopicsForConsumersException.scala +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Thrown when a request is made for a topic but it doest - * exist in the Zookeeper - */ -class NoTopicsForConsumersException(message: String) extends RuntimeException(message) { - def this() = this(null) -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/common/UnexpectedNameIdException.scala b/core/src/main/scala/kafka/common/UnexpectedNameIdException.scala deleted file mode 100644 index 87d9794..0000000 --- a/core/src/main/scala/kafka/common/UnexpectedNameIdException.scala +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Thrown when a name/id for some server/producer/consumer - * is not equal to what is expected - */ -class UnexpectedNameIdException(message: String) extends RuntimeException(message) { - def this() = this(null) -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/consumer/CoordinatorRequestHandler.scala b/core/src/main/scala/kafka/consumer/CoordinatorRequestHandler.scala deleted file mode 100644 index d7db013..0000000 --- a/core/src/main/scala/kafka/consumer/CoordinatorRequestHandler.scala +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.consumer - - -import org.I0Itec.zkclient.ZkClient -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.CountDownLatch -import kafka.common.{UnknownCodecException, ChannelHasMultipleMessageException} -import collection.mutable.HashMap -import kafka.utils._ -import util.parsing.json.JSON - -object CoordinatorRequest extends Logging { - val Option = "option" - val StartFetcher = "start-fetcher" - val StopFetcher = "stop-fetcher" - - def getCoordinatorRequest(requestJson: String): CoordinatorRequest = { - var topMap : HashMap[String, String] = null - try { - JSON.parseFull(requestJson) match { - case Some(m) => - topMap = m.asInstanceOf[HashMap[String, String]] - val requestOpt = topMap.get(Option) - requestOpt match { - case Some(request) => - request match { - case StartFetcher => { - topMap.remove(Option) - new StartFetcher(topMap) - } - case StopFetcher => new StopFetcher() - case _ => throw new IllegalStateException("Unknown coordinator request : " + request) - } - case None => - throw new IllegalStateException("Illegal coordinator request JSON : " + requestJson) - } - case None => throw new RuntimeException("Error parsing coordinator request JSON : " + requestJson) - } - } catch { - case e => - error("Error parsing coordinator request JSON " + requestJson, e) - throw e - } - } -} - -sealed trait CoordinatorRequest extends Logging { - def state: String - - def toJson(): String -} - -case class StopFetcher() extends CoordinatorRequest { - val state: String = CoordinatorRequest.StopFetcher - - - def toJson(): String = { - val jsonMap = new HashMap[String, String] - jsonMap.put(CoordinatorRequest.Option, state) - Utils.stringMapToJsonString(jsonMap) - } -} - -case class StartFetcher(ownership: HashMap[String, String]) extends CoordinatorRequest { - val state: String = CoordinatorRequest.StopFetcher - - def toJson(): String = { - ownership.put(CoordinatorRequest.Option, state) - Utils.stringMapToJsonString(ownership) - } -} - - -class CoordinatorRequestHandler(name: String, config: ConsumerConfig) extends Thread(name) with Logging { - - val stateChangeQ: ZkQueue = new ZkQueue( - new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer), - ZkUtils.ConsumerStatePath + "/" + config.consumerId, 1) - - val fetcherStopped: AtomicBoolean = new AtomicBoolean(true) - val isRunning: AtomicBoolean = new AtomicBoolean(true) - private val shutdownLatch = new CountDownLatch(1) - - // TODO: For now we stop/start all FetcherRunnables given the assumption that at least one partition of the topic is being fetched by any consumer - - override def run() { - try { - while(isRunning.get()) { - // get outstanding state change requests for this broker - val command = stateChangeQ.take() - - // check this is the only command - if (!stateChangeQ.isFull) throw new ChannelHasMultipleMessageException("Channel has multiple messages") - - if (command._2 == CoordinatorRequest.StartFetcher) handleStartFetcher() - else if (command._2 == CoordinatorRequest.StopFetcher) handleStopFetcher() - else throw new UnknownCodecException("Unknown consumer state change command") - - stateChangeQ.remove(command) - } - }catch { - case e: InterruptedException => info("State change command handler interrupted. Shutting down") - case e1 => error("Error in state change command handler. Shutting down due to ", e1) - } - shutdownComplete() - } - - private def shutdownComplete() = shutdownLatch.countDown - - def shutdown() { - isRunning.set(false) - interrupt() - shutdownLatch.await() - info("State change command handler shutdown completed") - } - - def handleStartFetcher() { - info("Received start fetcher state change command") - // TODO: Start fetcher by reading the Zookeeper - fetcherStopped.set(false) - } - - def handleStopFetcher() { - info("Received close replica state change command") - // TODO: Stop fetcher, clean queue, commit offset - fetcherStopped.set(true) - } -} diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 2bba2b2..f7782df 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -104,10 +104,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null - private var coordinatorRequestHandler: CoordinatorRequestHandler = null - - def fetcherStopped: Boolean = coordinatorRequestHandler.fetcherStopped.get() - val consumerIdString = { var consumerUuid : String = null config.consumerId match { @@ -138,10 +134,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (messageStreamCreated.getAndSet(true)) throw new RuntimeException(this.getClass.getSimpleName + " can create message streams at most once") - - coordinatorRequestHandler = new CoordinatorRequestHandler(config.consumerId + "-coordinator-request-handler", config) - coordinatorRequestHandler.start() - consume(topicCountMap, decoder) } @@ -180,10 +172,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, zkClient.close() zkClient = null } - if (coordinatorRequestHandler != null) { - coordinatorRequestHandler.shutdown() - coordinatorRequestHandler = null - } } catch { case e => diff --git a/core/src/main/scala/kafka/server/KafkaConsumerCoordinator.scala b/core/src/main/scala/kafka/server/KafkaConsumerCoordinator.scala deleted file mode 100644 index 0c2808c..0000000 --- a/core/src/main/scala/kafka/server/KafkaConsumerCoordinator.scala +++ /dev/null @@ -1,305 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.server - -import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} -import org.apache.zookeeper.Watcher.Event.KeeperState -import java.util.concurrent.atomic.AtomicBoolean -import collection.mutable.{Set, Map} -import kafka.utils.ZkUtils._ -import kafka.utils._ -import kafka.common._ - - -class KafkaConsumerCoordinator(zkClient: ZkClient, brokerId: Int) extends Logging { - - /** - * On startup: - * - * 1. Read the consumer group info, register child change watch on /consumers/[group]/ids for each group, and /consumers for group changes. - * - * 2. Register child change watch for /brokers/topics/[topic] for each topic. - * - * 3. Register a listener for session expiration events. - * - * 4. For each group, keep an atomic boolean specifying if the group is under rebalancing. - */ - var groupsBeingRebalanced = Map[String, AtomicBoolean]() - - var consumerGroupsPerTopic = Map[String, Set[String]]() - - var groupsWithWildcardTopics = Set[String]() - - var rebalanceRequestQ: ZkQueue = null - - var requestHandler: RebalanceRequestHandler = null - - private val elector = new ZkElection(zkClient, ZkUtils.BrokerCoordinationPath, becomeConsumerLeader, brokerId.toString) - - def consumerCoordId: Int = elector.leaderId.toInt - - def becomeConsumerLeader(id: String) { - // Check it is really me who become the coordinator - if (id.toInt != brokerId) - throw new UnexpectedNameIdException("The coordinator startup logic is called for a different server") - - startup - } - - def startup = { - - import scala.collection.JavaConversions._ - - val topics = zkClient.getChildren(ZkUtils.BrokerTopicsPath) - - rebalanceRequestQ = new ZkQueue(zkClient, BrokerCoordinationPath + "/queue", 10) - - val groupsNeedRebalance = rebalanceRequestQ.readAll().map(f => f._2) - - for (topic <- topics) { - - // Watch for topic broker changes - zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic, new BrokerChangeListener) - - // Add the topic to as the candidate of some groups' interest - consumerGroupsPerTopic += (topic -> Set.empty[String]) - } - - val groups = zkClient.getChildren(ZkUtils.ConsumersPath) - - for (group <- groups) { - - // Watch for group member changes - zkClient.subscribeChildChanges(ZkUtils.ConsumersPath + "/" + group + "/ids", new GroupMemberChangeListener) - - // Keep the boolean indicating if the group is under rebalancing operation or not - // When the group already requires rebalance, directly set to true - if (groupsNeedRebalance contains group) - groupsBeingRebalanced += (group -> new AtomicBoolean(true)) - else - groupsBeingRebalanced += (group -> new AtomicBoolean(false)) - - // TODO: We assume all the partitions of the interested topics are fetched by some consumer in the group - val (topics, wildcard) = ZkUtils.getTopicsForGroupMaybeWildcard(zkClient, group) - - if (wildcard) groupsWithWildcardTopics += group - - topics.map { - topic => - // Remember the group's interested topics - if (!consumerGroupsPerTopic.contains(topic)) throw new NoTopicsForConsumersException( - "%s cannot be found in ZK to be registered for interested group %s".format(topic, group)) - else consumerGroupsPerTopic(topic).add(group) - - // Check if this group does not know the ownership info for the topic and hence requires rebalance - val topicDirs = new ZKGroupTopicDirs(group, topic) - makeSurePersistentPathExists(zkClient, topicDirs.consumerOwnerDir) - - val assignedPartitions = zkClient.getChildren(topicDirs.consumerOwnerDir) - - if (assignedPartitions.isEmpty && groupsBeingRebalanced(group).compareAndSet(false, true)) - rebalanceRequestQ.put(group) - } - } - - // Watch for group changes - zkClient.subscribeChildChanges(ZkUtils.ConsumersPath, new GroupChangeListener) - - // Watch for session timeout - zkClient.subscribeStateChanges(new ConsumerCoordinatorSessionExpireListener) - - // Start processing rebalance requests - requestHandler = new RebalanceRequestHandler(zkClient, this) - requestHandler.start - } - - - - def shutdown() { - // First resign the elector to let others know - // so that they can run re-election - elector.close - - // If the requestHandler is once created, shut it down - if (requestHandler != null) - requestHandler.shutdown - - reset - - info("State change command handler shutdown completed") - } - - def reset = { - groupsBeingRebalanced.clear - consumerGroupsPerTopic.clear - groupsWithWildcardTopics.clear - } - - class ConsumerCoordinatorSessionExpireListener extends IZkStateListener { - - @throws(classOf[Exception]) - def handleStateChanged(state: KeeperState) { - // do nothing, since zkclient will do reconnect for us. - } - - /** - * Called after the zookeeper session has expired and a new session has been created. Try to elect as the leader - * - * @throws Exception - * On any error. - */ - @throws(classOf[Exception]) - def handleNewSession() { - /** - * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a - * connection for us. - */ - info(brokerId + " ZK expired; check if I am the coordinator") - - // First reset myself, then try to re-elect - reset - - if (elector.elect) startup - } - - } - - class GroupChangeListener extends IZkChildListener with Logging { - - /** - * Called when a new consumer group is registered; - * an existing consumer group should not be deleted since the group nodes are persistent - * - * @throws Exception - * On any error. - */ - @throws(classOf[Exception]) - def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - - info("%s ZkGroupChange listener fired for path %s with children %s" - .format(brokerId, parentPath, curChilds.toString)) - - import scala.collection.JavaConversions._ - - // Update in-memory data and subscribe watchers with newly registered groups - val addedGroups = curChilds filter (groupsBeingRebalanced.keySet contains) - - if (addedGroups.nonEmpty) - info("Group event: added groups = %s" - .format(addedGroups)) - - for (group <- addedGroups) { - - // Update groupsWithWildcardTopics - val (topics, wildcard) = ZkUtils.getTopicsForGroupMaybeWildcard(zkClient, group) - if (wildcard) groupsWithWildcardTopics += group - - // If the group already has some consumers with some interested - // topics registered, immediately request rebalance - if (!topics.isEmpty) { - groupsBeingRebalanced += (group -> new AtomicBoolean(true)) - rebalanceRequestQ.put(group) - } - else - groupsBeingRebalanced += (group -> new AtomicBoolean(false)) - - // Update consumerGroupsPerTopic - topics.map { - topic => - // Remember the group's interested topics - if (!consumerGroupsPerTopic.contains(topic)) throw new NoTopicsForConsumersException( - "%s cannot be found in ZK to be registered for interested group %s".format(topic, group)) - else consumerGroupsPerTopic(topic).add(group) - } - - // Watch for group member changes - zkClient.subscribeChildChanges(ZkUtils.ConsumersPath + "/" + group + "/ids", new GroupMemberChangeListener) - } - - // Since groups are persistent path, they should never be deleted - val deletedGroups = groupsBeingRebalanced.keySet filter (curChilds contains) - - if (deletedGroups.nonEmpty) - throw new UnexpectedZookeeperASyncException("Group %s is deleted, which should not happen" - .format(deletedGroups)) - } - } - - class GroupMemberChangeListener extends IZkChildListener with Logging { - - /** - * Called when the group member information stored in zookeeper has changed. - * Try to request a rebalance for the group by setting the bit in groupsBeingRebalanced - * - * @throws Exception - * On any error. - */ - @throws(classOf[Exception]) - def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - - info("%s ZkGroupMemberChange listener fired for path %s with children %s". - format(brokerId, parentPath, curChilds.toString)) - - val group = parentPath.split("/").last - - // If the group is already empty, no need to rebalance any more - val dirs = new ZKGroupDirs(group) - val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir) - - if (!consumers.isEmpty && groupsBeingRebalanced(group).compareAndSet(false, true)) rebalanceRequestQ.put(group) - } - } - - class BrokerChangeListener extends IZkChildListener with Logging { - - /** - * Called when the broker's topic information stored in zookeeper has changed. - * Try to request a rebalance for each group interested in the changed topic - * by setting the bit in groupsBeingRebalanced - * - * @throws Exception - * On any error. - */ - @throws(classOf[Exception]) - def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - - info("%s ZkGroupMemberChange listener fired for path %s with children %s". - format(brokerId, parentPath, curChilds.toString)) - - val topic = parentPath.split("/").last - - // Get groups with static interests that contain this topic - val staticGroups = consumerGroupsPerTopic(topic) - - // Get groups with wildcard interests that match this topic - var wildcardGroups = Set[String]() - - groupsWithWildcardTopics.foreach ( - group => - if (ZkUtils.getTopicsForGroup(zkClient, group) contains topic) wildcardGroups += group - ) - - for (group <- staticGroups | wildcardGroups) { - // If the group is already empty, no need to rebalance any more - val dirs = new ZKGroupDirs(group) - val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir) - - if (!consumers.isEmpty && groupsBeingRebalanced(group).compareAndSet(false, true)) rebalanceRequestQ.put(group) - } - } - } -} diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index a6e83bd..b2abb08 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -20,11 +20,9 @@ package kafka.server import kafka.log.LogManager import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean +import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler, Logging} import kafka.network.{SocketServerStats, SocketServer} import java.io.File -import org.I0Itec.zkclient.ZkClient -import kafka.utils._ -import kafka.common.UnexpectedNameIdException /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required @@ -39,12 +37,10 @@ class KafkaServer(val config: KafkaConfig) extends Logging { var socketServer: SocketServer = null - var scheduler: KafkaScheduler = null + val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false) private var logManager: LogManager = null - var consumerCoordinator: KafkaConsumerCoordinator = null - /** * Start up API for bringing up a single instance of the Kafka server. * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers @@ -59,7 +55,6 @@ class KafkaServer(val config: KafkaConfig) extends Logging { needRecovery = false cleanShutDownFile.delete } - scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false) logManager = new LogManager(config, scheduler, SystemTime, @@ -83,12 +78,6 @@ class KafkaServer(val config: KafkaConfig) extends Logging { * So this should happen after socket server start. */ logManager.startup() - - val coordinatorZkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, - config.zkConnectionTimeoutMs, ZKStringSerializer) - - consumerCoordinator = new KafkaConsumerCoordinator(coordinatorZkClient, config.brokerId) - info("Kafka server started.") } @@ -106,8 +95,6 @@ class KafkaServer(val config: KafkaConfig) extends Logging { Utils.unregisterMBean(statsMBeanName) if (logManager != null) logManager.close() - if (consumerCoordinator != null) - consumerCoordinator.shutdown val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE) cleanShutDownFile.createNewFile diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 1fda59f..eb72b0f 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -669,21 +669,6 @@ object Utils extends Logging { } } - def stringMapToJsonString(jsonDataMap: Map[String, String]): String = { - val builder = new StringBuilder - builder.append("{ ") - var numElements = 0 - for ( (key, value) <- jsonDataMap) { - if (numElements > 0) - builder.append(",") - builder.append("\"" + key + "\": ") - builder.append("\"" + value + "\"") - numElements += 1 - } - builder.append(" }") - builder.toString - } - /** * Create a circular (looping) iterator over a collection. * @param coll An iterable over the underlying collection. diff --git a/core/src/main/scala/kafka/utils/ZkElection.scala b/core/src/main/scala/kafka/utils/ZkElection.scala deleted file mode 100644 index 7fb5236..0000000 --- a/core/src/main/scala/kafka/utils/ZkElection.scala +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import kafka.utils.ZkUtils._ -import org.I0Itec.zkclient.exception.ZkNodeExistsException -import org.I0Itec.zkclient.{IZkChildListener, ZkClient} - - -class ZkElection(zkClient: ZkClient, path: String, actAsLeader: String => Unit, serverId: String) extends Logging { - - var leaderId = "" - - var electionPath = "/election" - - // create the election path in ZK, if one does not exist - makeSurePersistentPathExists(zkClient, path) - - // Try to become a leader - if (elect) actAsLeader(serverId) - - // The election module does not handle session expiration, instead it - // presumes the caller will handle it by probably try to re-elect again - - def amLeader : Boolean = leaderId.eq(serverId) - - def tryElectAndAct: Unit = { - // If I am already the leader, no need to elect and act - if (amLeader) true - else if (elect) actAsLeader(serverId) - } - - def elect: Boolean = { - - try { - createEphemeralPathExpectConflict(zkClient, path + electionPath, serverId) - info(serverId + " successfully elected as leader") - - leaderId = serverId - } catch { - case e: ZkNodeExistsException => - // If someone else has written the path, then - info("Someone else has elected as leader other than " + serverId) - - leaderId = ZkUtils.readData(zkClient, path + electionPath) - - case e2 => throw e2 - } - - zkClient.subscribeChildChanges(path, new ZkElectionListener) - - amLeader - } - - def resign = { - - // Only the leader can resign - val curLeaderId = ZkUtils.readData(zkClient, path + electionPath) - - assert(curLeaderId == serverId, "Cannot resign a non-leader for " + serverId) - - info(serverId + " decide to resign from leadership") - - zkClient.delete(path + electionPath) - - leaderId = "" - } - - def close = { - // If I am the leader, resign - if (leaderId == serverId) - resign - - zkClient.close() - } - - class ZkElectionListener extends IZkChildListener with Logging { - - /** - * Called when the leader information stored in zookeeper has changed. Try to elect as the leader - * - * @throws Exception - * On any error. - */ - @throws(classOf[Exception]) - def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - - info("%s ZkElection listener fired for path %s with children %s".format(serverId, parentPath, curChilds.toString)) - - // Try to elect and act as a leader, if I am possibly - // already the leader then I need to do nothing - tryElectAndAct - } - } -} diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index f2e1447..caddb06 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -23,17 +23,13 @@ import kafka.cluster.{Broker, Cluster} import scala.collection._ import java.util.Properties import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError} -import kafka.server.KafkaServer -import kafka.consumer.{WildcardTopicCount, TopicCount} +import kafka.consumer.TopicCount object ZkUtils extends Logging { val ConsumersPath = "/consumers" - val ConsumerStatePath = "/consumers/state" - val BrokerCoordinationPath = "/brokers/coordinator" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" - /** * make sure a persistent path exists in ZK. Create the path if not exist. */ @@ -99,25 +95,6 @@ object ZkUtils extends Logging { } /** - * Create an persistent node with the given path and data. Create parents if necessary. - */ - def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = { - try { - client.createPersistent(path, data) - } - catch { - case e: ZkNoNodeException => { - createParentPath(client, path) - client.createPersistent(path, data) - } - } - } - - def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = { - client.createPersistentSequential(path, data) - } - - /** * Update the value of a persistent node with the given path and data. * create parrent directory if necessary. Never throw NodeExistException. */ @@ -157,7 +134,7 @@ object ZkUtils extends Logging { } } - def deletePath(client: ZkClient, path: String): Boolean = { + def deletePath(client: ZkClient, path: String) { try { client.delete(path) } @@ -165,7 +142,6 @@ object ZkUtils extends Logging { case e: ZkNoNodeException => // this can happen during a connection loss event, return normally info(path + " deleted during connection loss; this is ok") - false case e2 => throw e2 } } @@ -231,46 +207,6 @@ object ZkUtils extends Logging { cluster } - /** - * Get the interested topics of the group - * - * @param zkClient: Zookeeper client - * @param group: Name of the group - */ - def getTopicsForGroup(zkClient: ZkClient, group: String) : Set[String] = { - val consumersInGroup = getConsumersInGroup(zkClient, group) - var totalTopics: Set[String] = Set.empty[String] - for (consumer <- consumersInGroup) { - val topicCount = TopicCount.constructTopicCount(group, consumer, zkClient) - val consumerTopics = topicCount.getConsumerThreadIdsPerTopic.map(_._1) - totalTopics ++= consumerTopics - } - - totalTopics - } - - /** - * Get the interested topics of the group. - * If one consumer in the group use a wildcard, then the whole group is marked as wildcard - * - * @param zkClient: Zookeeper client - * @param group: Name of the group - */ - - def getTopicsForGroupMaybeWildcard(zkClient: ZkClient, group: String) : (Set[String], Boolean) = { - var isWildcard : Boolean = false - val consumersInGroup = getConsumersInGroup(zkClient, group) - var totalTopics: Set[String] = Set.empty[String] - for (consumer <- consumersInGroup) { - val topicCount = TopicCount.constructTopicCount(group, consumer, zkClient) - if (!isWildcard && topicCount.isInstanceOf[WildcardTopicCount]) isWildcard = true - val consumerTopics = topicCount.getConsumerThreadIdsPerTopic.map(_._1) - totalTopics ++= consumerTopics - } - - (totalTopics, isWildcard) - } - def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, List[String]] = { val ret = new mutable.HashMap[String, List[String]]() for (topic <- topics) { diff --git a/core/src/test/scala/other/kafka/TestZKElection.scala b/core/src/test/scala/other/kafka/TestZKElection.scala deleted file mode 100644 index 5cf2d44..0000000 --- a/core/src/test/scala/other/kafka/TestZKElection.scala +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka - -import org.I0Itec.zkclient.ZkClient -import utils.ZkElection - - -object TestZKElection { - - val zkConnect = "127.0.0.1:2181" - - def asLeader(name: String) = { - println(name + ": am the leader!") - } - - def main(args: Array[String]) { - if(args.length < 2) { - println("USAGE: " + TestZKElection.getClass.getName + " num_electors zk_path") - System.exit(1) - } - - val size = args(0).toInt - val path = args(1) - - val electorThreads : Array[ZkElection] = new Array[ZkElection](size) - - var curLeader = -1; - - // Start all the electors, let them elect - for(i <- 0 until size) { - val electThread = new ZkElection(new ZkClient(zkConnect, 1000, 1000), path, asLeader, i.toString) - //electorThreads(i) = electThread - electorThreads(i) = electThread - } - - // Check who is the leader now - for(i <- 0 until electorThreads.size) { - if (electorThreads(i).amLeader) { - - curLeader = i - } - else { - println(i + ": am not the leader, I think the leader is " + (if ( electorThreads(i).leaderId == "" ) - "... sorry I do not know since I am a dead man" - else - electorThreads(i).leaderId - )) - } - } - - // Do this 10 times - for (t <- 0 until 10) { - - // Shoot down current leader - println(curLeader + ": ah! I'm dead...") - electorThreads(curLeader).resign - - Thread.sleep(1500) - - // Check who is the leader now - var hasLeader = false - for(i <- 0 until electorThreads.size) { - if (electorThreads(i).amLeader) { - - curLeader = i - assert(!hasLeader, "Two leaders at the same time is not acceptable") - hasLeader = true - } - else { - println(i + ": am not the leader, I think the leader is " + (if ( electorThreads(i).leaderId == "" ) - "... sorry I do not know since I am a dead man" - else - electorThreads(i).leaderId - )) - } - } - - if (!hasLeader) { - println("No Leader elected: probably everyone is dead...") - return - } - } - } -} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/ConsumerCoordinatorTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerCoordinatorTest.scala deleted file mode 100644 index f6e6318..0000000 --- a/core/src/test/scala/unit/kafka/server/ConsumerCoordinatorTest.scala +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.server - -import java.io.File -import kafka.api.FetchRequest -import kafka.producer.{SyncProducer, SyncProducerConfig} -import java.util.Properties -import org.scalatest.junit.JUnitSuite -import org.junit.Test -import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} -import org.I0Itec.zkclient.ZkClient -import kafka.utils.ZkUtils._ -import kafka.utils._ -import junit.framework.Assert._ -import kafka.consumer.{Whitelist, TopicFilter, TopicCount, SimpleConsumer} - -class ConsumerCoordinatorTest extends JUnitSuite { - val brokerId1 = 1 - val brokerId2 = 2 - val brokerId3 = 3 - - val port1 = TestUtils.choosePort() - val port2 = TestUtils.choosePort() - val port3 = TestUtils.choosePort() - - val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) - val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) - val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3) - - val zkConnect = TestZKUtils.zookeeperConnect - val zkSessionTimeoutMs = 1000 - val zkConnectionTimeoutMs = 1000 - - @Test - def testCoordinatorStartup() { - // Manually create some fake topics and consumer groups in the Zookeeper - var tmpZkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, zkConnectionTimeoutMs, ZKStringSerializer) - - makeSurePersistentPathExists(tmpZkClient, ZkUtils.BrokerTopicsPath) - - ZkUtils.createEphemeralPathExpectConflict(tmpZkClient, ZkUtils.BrokerTopicsPath + "/topic1", "null") - ZkUtils.createEphemeralPathExpectConflict(tmpZkClient, ZkUtils.BrokerTopicsPath + "/topic2", "null") - ZkUtils.createEphemeralPathExpectConflict(tmpZkClient, ZkUtils.BrokerTopicsPath + "/topic3", "null") - - makeSurePersistentPathExists(tmpZkClient, ZkUtils.ConsumersPath) - makeSurePersistentPathExists(tmpZkClient, ZkUtils.ConsumersPath + "/group1/ids") - makeSurePersistentPathExists(tmpZkClient, ZkUtils.ConsumersPath + "/group2/ids") - makeSurePersistentPathExists(tmpZkClient, ZkUtils.ConsumersPath + "/group3/ids") - makeSurePersistentPathExists(tmpZkClient, ZkUtils.ConsumersPath + "/group4/ids") - - // Group 1: Consumer1 => {1}, Consumer2 => {1}, interested in {1} - // Group 1: Consumer1 => {1, 2}, Consumer2 => {1}, interested in {1, 2} - // Group 1: Consumer1 => {1, 2}, Consumer2 => {2, 3}, interested in {1, 2, 3} - - var topicCountMap1: scala.collection.Map[String, Int] = Map.empty[String, Int] - var topicCountMap12: scala.collection.Map[String, Int] = Map.empty[String, Int] - var topicCountMap23: scala.collection.Map[String, Int] = Map.empty[String, Int] - topicCountMap1 += ("topic1" -> 1) - topicCountMap12 += ("topic1" -> 1, "topic2" -> 2) - topicCountMap23 += ("topic2" -> 2, "topic3" -> 3) - - val topicCount1 = TopicCount.constructTopicCount("null", topicCountMap1) - val topicCount12 = TopicCount.constructTopicCount("null", topicCountMap12) - val topicCount23 = TopicCount.constructTopicCount("null", topicCountMap23) - val topicCountWildcard = TopicCount.constructTopicCount("null", new Whitelist("""[\p{Alnum}]+"""), 1, tmpZkClient) - - ZkUtils.createEphemeralPathExpectConflict(tmpZkClient, ZkUtils.ConsumersPath + "/group1/ids/consumer1", topicCount1.dbString) - ZkUtils.createEphemeralPathExpectConflict(tmpZkClient, ZkUtils.ConsumersPath + "/group1/ids/consumer2", topicCount1.dbString) - ZkUtils.createEphemeralPathExpectConflict(tmpZkClient, ZkUtils.ConsumersPath + "/group2/ids/consumer1", topicCount12.dbString) - ZkUtils.createEphemeralPathExpectConflict(tmpZkClient, ZkUtils.ConsumersPath + "/group2/ids/consumer2", topicCount1.dbString) - ZkUtils.createEphemeralPathExpectConflict(tmpZkClient, ZkUtils.ConsumersPath + "/group3/ids/consumer1", topicCount12.dbString) - ZkUtils.createEphemeralPathExpectConflict(tmpZkClient, ZkUtils.ConsumersPath + "/group3/ids/consumer2", topicCount23.dbString) - ZkUtils.createEphemeralPathExpectConflict(tmpZkClient, ZkUtils.ConsumersPath + "/group4/ids/consumer1", topicCountWildcard.dbString) - - // Start servers - var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] - - val server1 = TestUtils.createServer(new KafkaConfig(configProps1)) - val server2 = TestUtils.createServer(new KafkaConfig(configProps2)) - val server3 = TestUtils.createServer(new KafkaConfig(configProps3)) - - servers ++= List(server1, server2, server3) - - // Check that a unique coordinator has been elected - assertEquals("1 and 2 must be agreed on the coordinator", server1.consumerCoordinator.consumerCoordId, server2.consumerCoordinator.consumerCoordId) - assertEquals("1 and 3 must be agreed on the coordinator", server1.consumerCoordinator.consumerCoordId, server3.consumerCoordinator.consumerCoordId) - assertTrue("Coordinator must be one of the servers", server1.consumerCoordinator.consumerCoordId <= 3 && server1.consumerCoordinator.consumerCoordId >= 1) - - var curCoordId = server1.consumerCoordinator.consumerCoordId - var curCood = (if (curCoordId == 1) server1 else if (curCoordId == 2) server2 else server3) - - // Check that the coordinator have the correct interest group info - val topic1Interested: collection.mutable.Set[String] = collection.mutable.Set("group1", "group2", "group3", "group4") - val topic2Interested: collection.mutable.Set[String] = collection.mutable.Set("group2", "group3", "group4") - val topic3Interested: collection.mutable.Set[String] = collection.mutable.Set("group3", "group4") - val wildcardGroup = collection.mutable.Set("group4") - - assertEquals("All three groups should be interested in topic1", curCood.consumerCoordinator.consumerGroupsPerTopic("topic1"), topic1Interested) - assertEquals("Group 2, 3 and 4 should be interested in topic2", curCood.consumerCoordinator.consumerGroupsPerTopic("topic2"), topic2Interested) - assertEquals("Group 3 and 4 should be interested in topic3", curCood.consumerCoordinator.consumerGroupsPerTopic("topic3"), topic3Interested) - - assertEquals("The wildcard group should be group 4", curCood.consumerCoordinator.groupsWithWildcardTopics, wildcardGroup) - - Thread.sleep(1500) - - // Shoot down the coordinator, then restart it, check that a new elector is up and keep up with the interest group info - curCood.shutdown() - - Thread.sleep(1500) - - curCood.startup() - - assertEquals("1 and 2 must be agreed on the coordinator", server1.consumerCoordinator.consumerCoordId, server2.consumerCoordinator.consumerCoordId) - assertEquals("1 and 3 must be agreed on the coordinator", server1.consumerCoordinator.consumerCoordId, server3.consumerCoordinator.consumerCoordId) - assertTrue("Coordinator must be one of the servers", server1.consumerCoordinator.consumerCoordId <= 3 && server1.consumerCoordinator.consumerCoordId >= 1) - - curCoordId = server1.consumerCoordinator.consumerCoordId - curCood = (if (curCoordId == 1) server1 else if (curCoordId == 2) server2 else server3) - - // Check that the coordinator have the correct interest group info - assertEquals("All three groups should be interested in topic1", curCood.consumerCoordinator.consumerGroupsPerTopic("topic1"), topic1Interested) - assertEquals("Group 2 and 3 should be interested in topic2", curCood.consumerCoordinator.consumerGroupsPerTopic("topic2"), topic2Interested) - assertEquals("Group 3 should be interested in topic3", curCood.consumerCoordinator.consumerGroupsPerTopic("topic3"), topic3Interested) - - assertEquals("The wildcard group should be group 4", curCood.consumerCoordinator.groupsWithWildcardTopics, wildcardGroup) - - server1.shutdown() - server2.shutdown() - server3.shutdown() - } -} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 4cfc0ea..25f6b49 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -304,5 +304,5 @@ object TestUtils { } object TestZKUtils { - val zookeeperConnect = "127.0.0.1:2181" + val zookeeperConnect = "127.0.0.1:2182" }