From 947bb9601a84f4f1006de992f924ac0b02e10f0e Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 15 May 2015 11:09:07 -0700 Subject: [PATCH 1/3] v1 --- .../main/scala/kafka/server/DelayedOperation.scala | 44 +++++++++++++++++----- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 2ed9b46..51da68b 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -19,11 +19,13 @@ package kafka.server import kafka.utils._ import kafka.utils.timer._ +import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.metrics.KafkaMetricsGroup import java.util.LinkedList import java.util.concurrent._ import java.util.concurrent.atomic._ +import java.util.concurrent.locks.ReentrantReadWriteLock import org.apache.kafka.common.utils.Utils @@ -122,7 +124,10 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br private[this] val timeoutTimer = new Timer(executor) /* a list of operation watching keys */ - private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) + //private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) + private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key))) + + private val watchersLock = new ReentrantReadWriteLock() // the number of estimated total operations in the purgatory private[this] val estimatedTotalOperations = new AtomicInteger(0) @@ -217,7 +222,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br * @return the number of completed operations during this process */ def checkAndComplete(key: Any): Int = { - val watchers = watchersForKey.get(key) + val watchers = inReadLock(watchersLock) { watchersForKey.get(key) } if(watchers == null) 0 else @@ -229,7 +234,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br * on multiple lists, and some of its watched entries may still be in the watch lists * even when it has been completed, this number may be larger than the number of real operations watched */ - def watched() = watchersForKey.values.map(_.watched).sum + def watched() = allWatchers.map(_.watched).sum /** * Return the number of delayed operations in the expiry queue @@ -239,7 +244,22 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br /* * Return the watch list of the given key */ - private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) + private def watchersFor(key: Any) = inReadLock(watchersLock) { watchersForKey.getAndMaybePut(key) } + + /* + * Return all the current watcher lists + */ + private def allWatchers = inReadLock(watchersLock) { watchersForKey.values } + + /* + * Remove the key from watcher lists if its list is empty + */ + private def removeKey(key: Any) = inWriteLock(watchersLock) { + val watchers = watchersForKey.get(key) + if (watchers != null && watchers.watched == 0) { + watchersForKey.remove(key) + } + } /** * Shutdown the expire reaper thread @@ -252,11 +272,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br /** * A linked list of watched delayed operations based on some key */ - private class Watchers { + private class Watchers(val key: Any) { private[this] val operations = new LinkedList[T]() - def watched(): Int = operations synchronized operations.size + def watched: Int = operations synchronized operations.size // add the element to watch def watch(t: T) { @@ -266,8 +286,8 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br // traverse the list and try to complete some watched elements def tryCompleteWatched(): Int = { + var completed = 0 operations synchronized { - var completed = 0 val iter = operations.iterator() while (iter.hasNext) { val curr = iter.next() @@ -279,8 +299,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br iter.remove() } } - completed + + if (operations.size == 0) + removeKey(key) } + completed } // traverse the list and purge elements that are already completed by others @@ -295,6 +318,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br purged += 1 } } + + if (operations.size == 0) + removeKey(key) } purged } @@ -319,7 +345,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br // a little overestimated total number of operations. estimatedTotalOperations.getAndSet(delayed) debug("Begin purging watch lists") - val purged = watchersForKey.values.map(_.purgeCompleted()).sum + val purged = allWatchers.map(_.purgeCompleted()).sum debug("Purged %d elements from watch lists.".format(purged)) } } -- 1.7.12.4 From 1776f23b8500cacaf3fd24a19b4c2c435ade98da Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 15 May 2015 13:37:45 -0700 Subject: [PATCH 2/3] v2 --- .../other/kafka/TestPurgatoryPerformance.scala | 25 ++++++++++++++-------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala index 39d6d8a..744be3b 100644 --- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala +++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala @@ -36,32 +36,37 @@ object TestPurgatoryPerformance { def main(args: Array[String]): Unit = { val parser = new OptionParser + val keySpaceSizeOpt = parser.accepts("key-space-size", "The total number of possible keys") + .withRequiredArg + .describedAs("total_num_possible_keys") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) val numRequestsOpt = parser.accepts("num", "The number of requests") .withRequiredArg .describedAs("num_requests") .ofType(classOf[java.lang.Double]) - val requestRateOpt = parser.accepts("rate", "The request rate") + val requestRateOpt = parser.accepts("rate", "The request rate per second") .withRequiredArg .describedAs("request_per_second") .ofType(classOf[java.lang.Double]) - val requestDataSizeOpt = parser.accepts("size", "The request data size") + val requestDataSizeOpt = parser.accepts("size", "The request data size in bytes") .withRequiredArg .describedAs("num_bytes") .ofType(classOf[java.lang.Long]) - val numKeysOpt = parser.accepts("keys", "The number of keys") + val numKeysOpt = parser.accepts("keys", "The number of keys for each request") .withRequiredArg .describedAs("num_keys") .ofType(classOf[java.lang.Integer]) .defaultsTo(3) - val timeoutOpt = parser.accepts("timeout", "The request timeout") + val timeoutOpt = parser.accepts("timeout", "The request timeout in ms") .withRequiredArg .describedAs("timeout_milliseconds") .ofType(classOf[java.lang.Long]) - val pct75Opt = parser.accepts("pct75", "75th percentile of request latency (log-normal distribution)") + val pct75Opt = parser.accepts("pct75", "75th percentile of request latency in ms (log-normal distribution)") .withRequiredArg .describedAs("75th_percentile") .ofType(classOf[java.lang.Double]) - val pct50Opt = parser.accepts("pct50", "50th percentile of request latency (log-normal distribution)") + val pct50Opt = parser.accepts("pct50", "50th percentile of request latency in ms (log-normal distribution)") .withRequiredArg .describedAs("50th_percentile") .ofType(classOf[java.lang.Double]) @@ -78,6 +83,7 @@ object TestPurgatoryPerformance { val numRequests = options.valueOf(numRequestsOpt).intValue val requestRate = options.valueOf(requestRateOpt).doubleValue val requestDataSize = options.valueOf(requestDataSizeOpt).intValue + val numPossibleKeys = options.valueOf(keySpaceSizeOpt).intValue val numKeys = options.valueOf(numKeysOpt).intValue val timeout = options.valueOf(timeoutOpt).longValue val pct75 = options.valueOf(pct75Opt).doubleValue @@ -97,7 +103,8 @@ object TestPurgatoryPerformance { val initialCpuTimeNano = getProcessCpuTimeNanos(osMXBean) val latch = new CountDownLatch(numRequests) val start = System.currentTimeMillis - val keys = (0 until numKeys).map(i => "fakeKey%d".format(i)) + val rand = new Random() + val keys = (0 until numKeys).map(i => "fakeKey%d".format(rand.nextInt(numPossibleKeys))) @volatile var requestArrivalTime = start @volatile var end = 0L val generator = new Runnable { @@ -133,7 +140,7 @@ object TestPurgatoryPerformance { println("# enqueue rate (%d requests):".format(numRequests)) val gcCountHeader = gcNames.map("<" + _ + " count>").mkString(" ") val gcTimeHeader = gcNames.map("<" + _ + " time ms>").mkString(" ") - println("# %s %s".format(gcCountHeader, gcTimeHeader)) + println("# \t\t\t\t%s\t%s".format(gcCountHeader, gcTimeHeader)) } val targetRate = numRequests.toDouble * 1000d / (requestArrivalTime - start).toDouble @@ -143,7 +150,7 @@ object TestPurgatoryPerformance { val gcCounts = gcMXBeans.map(_.getCollectionCount) val gcTimes = gcMXBeans.map(_.getCollectionTime) - println("%d %f %f %d %s %s".format(done - start, targetRate, actualRate, cpuTime.getOrElse(-1L), gcCounts.mkString(" "), gcTimes.mkString(" "))) + println("%d\t%f\t%f\t%d\t%s\t%s".format(done - start, targetRate, actualRate, cpuTime.getOrElse(-1L), gcCounts.mkString(" "), gcTimes.mkString(" "))) purgatory.shutdown() } -- 1.7.12.4 From df6b701e8bf62fefad72c9c8fe2d1060bba6b94e Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 18 May 2015 14:05:37 -0700 Subject: [PATCH 3/3] remove join purgatory --- .../kafka/coordinator/ConsumerCoordinator.scala | 93 +++++------ .../kafka/coordinator/ConsumerGroupMetadata.scala | 131 ++++++++++++++++ .../scala/kafka/coordinator/ConsumerMetadata.scala | 49 ++++++ .../kafka/coordinator/CoordinatorMetadata.scala | 10 +- .../scala/kafka/coordinator/DelayedHeartbeat.scala | 4 +- .../scala/kafka/coordinator/DelayedJoinGroup.scala | 39 ----- .../scala/kafka/coordinator/DelayedRebalance.scala | 2 +- core/src/main/scala/kafka/coordinator/Group.scala | 131 ---------------- .../scala/kafka/coordinator/HeartbeatBucket.scala | 43 ------ .../coordinator/ConsumerGroupMetadataTest.scala | 172 +++++++++++++++++++++ .../scala/unit/kafka/coordinator/GroupTest.scala | 172 --------------------- 11 files changed, 398 insertions(+), 448 deletions(-) create mode 100644 core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala create mode 100644 core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala delete mode 100644 core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala delete mode 100644 core/src/main/scala/kafka/coordinator/Group.scala delete mode 100644 core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala create mode 100644 core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala delete mode 100644 core/src/test/scala/unit/kafka/coordinator/GroupTest.scala diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 6f05488..af06ad4 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -48,7 +48,6 @@ class ConsumerCoordinator(val config: KafkaConfig, private val isActive = new AtomicBoolean(false) private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null - private var joinGroupPurgatory: DelayedOperationPurgatory[DelayedJoinGroup] = null private var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = null private var coordinatorMetadata: CoordinatorMetadata = null @@ -63,7 +62,6 @@ class ConsumerCoordinator(val config: KafkaConfig, def startup() { info("Starting up.") heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId) - joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup]("JoinGroup", config.brokerId) rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId) coordinatorMetadata = new CoordinatorMetadata(config, zkClient, maybePrepareRebalance) isActive.set(true) @@ -79,7 +77,6 @@ class ConsumerCoordinator(val config: KafkaConfig, isActive.set(false) coordinatorMetadata.shutdown() heartbeatPurgatory.shutdown() - joinGroupPurgatory.shutdown() rebalancePurgatory.shutdown() info("Shutdown complete.") } @@ -113,7 +110,7 @@ class ConsumerCoordinator(val config: KafkaConfig, } } - private def doJoinGroup(group: Group, + private def doJoinGroup(group: ConsumerGroupMetadata, consumerId: String, topics: Set[String], sessionTimeoutMs: Int, @@ -154,14 +151,10 @@ class ConsumerCoordinator(val config: KafkaConfig, } } - consumer.awaitingRebalance = true - - val delayedJoinGroup = new DelayedJoinGroup(this, group, consumer, 2 * MaxSessionTimeoutMs, responseCallback) - val consumerGroupKey = ConsumerGroupKey(group.groupId) - joinGroupPurgatory.tryCompleteElseWatch(delayedJoinGroup, Seq(consumerGroupKey)) + consumer.awaitingRebalanceCallback = responseCallback if (group.is(PreparingRebalance)) - rebalancePurgatory.checkAndComplete(consumerGroupKey) + rebalancePurgatory.checkAndComplete(ConsumerGroupKey(group.groupId)) } } } @@ -199,34 +192,36 @@ class ConsumerCoordinator(val config: KafkaConfig, /** * Complete existing DelayedHeartbeats for the given consumer and schedule the next one */ - private def completeAndScheduleNextHeartbeatExpiration(group: Group, consumer: Consumer) { + private def completeAndScheduleNextHeartbeatExpiration(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) { + // complete current heartbeat expectation consumer.latestHeartbeat = SystemTime.milliseconds val consumerKey = ConsumerKey(consumer.groupId, consumer.consumerId) - // TODO: can we fix DelayedOperationPurgatory to remove keys in watchersForKey with empty watchers list? heartbeatPurgatory.checkAndComplete(consumerKey) - val heartbeatDeadline = consumer.latestHeartbeat + consumer.sessionTimeoutMs - val delayedHeartbeat = new DelayedHeartbeat(this, group, consumer, heartbeatDeadline, consumer.sessionTimeoutMs) + + // reschedule the next heartbeat expiration deadline + val newHeartbeatDeadline = consumer.latestHeartbeat + consumer.sessionTimeoutMs + val delayedHeartbeat = new DelayedHeartbeat(this, group, consumer, newHeartbeatDeadline, consumer.sessionTimeoutMs) heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(consumerKey)) } private def addConsumer(consumerId: String, topics: Set[String], sessionTimeoutMs: Int, - group: Group) = { - val consumer = new Consumer(consumerId, group.groupId, topics, sessionTimeoutMs) + group: ConsumerGroupMetadata) = { + val consumer = new ConsumerMetadata(consumerId, group.groupId, topics, sessionTimeoutMs) val topicsToBind = topics -- group.topics group.add(consumer.consumerId, consumer) coordinatorMetadata.bindGroupToTopics(group.groupId, topicsToBind) consumer } - private def removeConsumer(group: Group, consumer: Consumer) { + private def removeConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) { group.remove(consumer.consumerId) val topicsToUnbind = consumer.topics -- group.topics coordinatorMetadata.unbindGroupFromTopics(group.groupId, topicsToUnbind) } - private def updateConsumer(group: Group, consumer: Consumer, topics: Set[String]) { + private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) { val topicsToBind = topics -- group.topics group.remove(consumer.consumerId) val topicsToUnbind = consumer.topics -- group.topics @@ -235,14 +230,14 @@ class ConsumerCoordinator(val config: KafkaConfig, coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind) } - private def maybePrepareRebalance(group: Group) { + private def maybePrepareRebalance(group: ConsumerGroupMetadata) { group synchronized { if (group.canRebalance) prepareRebalance(group) } } - private def prepareRebalance(group: Group) { + private def prepareRebalance(group: ConsumerGroupMetadata) { group.transitionTo(PreparingRebalance) group.generationId += 1 info("Preparing to rebalance group %s generation %s".format(group.groupId, group.generationId)) @@ -253,7 +248,9 @@ class ConsumerCoordinator(val config: KafkaConfig, rebalancePurgatory.tryCompleteElseWatch(delayedRebalance, Seq(consumerGroupKey)) } - private def rebalance(group: Group) { + private def rebalance(group: ConsumerGroupMetadata) { + assert(group.notYetRejoinedConsumers == List.empty[ConsumerMetadata]) + group.transitionTo(Rebalancing) info("Rebalancing group %s generation %s".format(group.groupId, group.generationId)) @@ -263,11 +260,9 @@ class ConsumerCoordinator(val config: KafkaConfig, group.transitionTo(Stable) info("Stabilized group %s generation %s".format(group.groupId, group.generationId)) - val consumerGroupKey = ConsumerGroupKey(group.groupId) - joinGroupPurgatory.checkAndComplete(consumerGroupKey) } - private def onConsumerHeartbeatExpired(group: Group, consumer: Consumer) { + private def onConsumerHeartbeatExpired(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) { trace("Consumer %s in group %s has failed".format(consumer.consumerId, group.groupId)) removeConsumer(group, consumer) maybePrepareRebalance(group) @@ -275,7 +270,7 @@ class ConsumerCoordinator(val config: KafkaConfig, private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId)) - private def reassignPartitions(group: Group) = { + private def reassignPartitions(group: ConsumerGroupMetadata) = { val assignor = PartitionAssignor.createInstance(group.partitionAssignmentStrategy) val topicsPerConsumer = group.topicsPerConsumer val partitionsPerTopic = coordinatorMetadata.partitionsPerTopic @@ -286,31 +281,9 @@ class ConsumerCoordinator(val config: KafkaConfig, assignedPartitionsPerConsumer } - def tryCompleteJoinGroup(group: Group, forceComplete: () => Boolean) = { - group synchronized { - if (group.is(Stable)) - forceComplete() - else false - } - } - - def onExpirationJoinGroup() { - throw new IllegalStateException("DelayedJoinGroup should never expire") - } - - def onCompleteJoinGroup(group: Group, - consumer: Consumer, - responseCallback:(Set[TopicAndPartition], String, Int, Short) => Unit) { - group synchronized { - consumer.awaitingRebalance = false - completeAndScheduleNextHeartbeatExpiration(group, consumer) - responseCallback(consumer.assignedTopicPartitions, consumer.consumerId, group.generationId, Errors.NONE.code) - } - } - - def tryCompleteRebalance(group: Group, forceComplete: () => Boolean) = { + def tryCompleteRebalance(group: ConsumerGroupMetadata, forceComplete: () => Boolean) = { group synchronized { - if (group.allConsumersRejoined) + if (group.notYetRejoinedConsumers == List.empty[ConsumerMetadata]) forceComplete() else false } @@ -320,7 +293,7 @@ class ConsumerCoordinator(val config: KafkaConfig, // TODO: add metrics for rebalance timeouts } - def onCompleteRebalance(group: Group) { + def onCompleteRebalance(group: ConsumerGroupMetadata) { group synchronized { val failedConsumers = group.notYetRejoinedConsumers if (group.isEmpty || !failedConsumers.isEmpty) { @@ -335,12 +308,22 @@ class ConsumerCoordinator(val config: KafkaConfig, coordinatorMetadata.removeGroup(group.groupId, group.topics) } } - if (!group.is(Dead)) + if (!group.is(Dead)) { + // assign partitions to existing consumers of the group according to the partitioning strategy rebalance(group) + + // trigger the awaiting join group response callback for all the consumers after rebalancing + for (consumer <- group.allConsumers) { + assert(consumer.awaitingRebalanceCallback != null) + consumer.awaitingRebalanceCallback(consumer.assignedTopicPartitions, consumer.consumerId, group.generationId, Errors.NONE.code) + consumer.awaitingRebalanceCallback = null + completeAndScheduleNextHeartbeatExpiration(group, consumer) + } + } } } - def tryCompleteHeartbeat(group: Group, consumer: Consumer, heartbeatDeadline: Long, forceComplete: () => Boolean) = { + def tryCompleteHeartbeat(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = { group synchronized { if (shouldKeepConsumerAlive(consumer, heartbeatDeadline)) forceComplete() @@ -348,7 +331,7 @@ class ConsumerCoordinator(val config: KafkaConfig, } } - def onExpirationHeartbeat(group: Group, consumer: Consumer, heartbeatDeadline: Long) { + def onExpirationHeartbeat(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, heartbeatDeadline: Long) { group synchronized { if (!shouldKeepConsumerAlive(consumer, heartbeatDeadline)) onConsumerHeartbeatExpired(group, consumer) @@ -357,6 +340,6 @@ class ConsumerCoordinator(val config: KafkaConfig, def onCompleteHeartbeat() {} - private def shouldKeepConsumerAlive(consumer: Consumer, heartbeatDeadline: Long) = - consumer.awaitingRebalance || consumer.latestHeartbeat > heartbeatDeadline - consumer.sessionTimeoutMs + private def shouldKeepConsumerAlive(consumer: ConsumerMetadata, heartbeatDeadline: Long) = + consumer.awaitingRebalanceCallback != null || consumer.latestHeartbeat + consumer.sessionTimeoutMs > heartbeatDeadline } diff --git a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala new file mode 100644 index 0000000..47bdfa7 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import kafka.utils.nonthreadsafe + +import java.util.UUID + +import collection.mutable + +private[coordinator] sealed trait GroupState { def state: Byte } + +/** + * Consumer group is preparing to rebalance + * + * action: respond to heartbeats with an ILLEGAL GENERATION error code + * transition: some consumers have joined by the timeout => Rebalancing + * all consumers have left the group => Dead + */ +private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 } + +/** + * Consumer group is rebalancing + * + * action: compute the group's partition assignment + * send the join-group response with new partition assignment when rebalance is complete + * transition: partition assignment has been computed => Stable + */ +private[coordinator] case object Rebalancing extends GroupState { val state: Byte = 2 } + +/** + * Consumer group is stable + * + * action: respond to consumer heartbeats normally + * transition: consumer failure detected via heartbeat => PreparingRebalance + * consumer join-group received => PreparingRebalance + * zookeeper topic watcher fired => PreparingRebalance + */ +private[coordinator] case object Stable extends GroupState { val state: Byte = 3 } + +/** + * Consumer group has no more members + * + * action: none + * transition: none + */ +private[coordinator] case object Dead extends GroupState { val state: Byte = 4 } + + +/** + * Group contains the following metadata: + * + * Membership metadata: + * 1. Consumers registered in this group + * 2. Partition assignment strategy for this group + * + * State metadata: + * 1. group state + * 2. generation id + */ +@nonthreadsafe +private[coordinator] class ConsumerGroupMetadata(val groupId: String, + val partitionAssignmentStrategy: String) { + + private val validPreviousStates: Map[GroupState, Set[GroupState]] = + Map(Dead -> Set(PreparingRebalance), + Stable -> Set(Rebalancing), + PreparingRebalance -> Set(Stable), + Rebalancing -> Set(PreparingRebalance)) + + private val consumers = new mutable.HashMap[String, ConsumerMetadata] + private var state: GroupState = Stable + var generationId = 0 + + def is(groupState: GroupState) = state == groupState + def has(consumerId: String) = consumers.contains(consumerId) + def get(consumerId: String) = consumers(consumerId) + + def add(consumerId: String, consumer: ConsumerMetadata) { + consumers.put(consumerId, consumer) + } + + def remove(consumerId: String) { + consumers.remove(consumerId) + } + + def isEmpty = consumers.isEmpty + + def topicsPerConsumer = consumers.mapValues(_.topics).toMap + + def topics = consumers.values.flatMap(_.topics).toSet + + def notYetRejoinedConsumers = consumers.values.filter(_.awaitingRebalanceCallback == null).toList + + def allConsumers = consumers.values.toList + + def rebalanceTimeout = consumers.values.foldLeft(0) {(timeout, consumer) => + timeout.max(consumer.sessionTimeoutMs) + } + + // TODO: decide if ids should be predictable or random + def generateNextConsumerId = UUID.randomUUID().toString + + def canRebalance = state == Stable + + def transitionTo(groupState: GroupState) { + assertValidTransition(groupState) + state = groupState + } + + private def assertValidTransition(targetState: GroupState) { + if (!validPreviousStates(targetState).contains(state)) + throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state" + .format(groupId, validPreviousStates(targetState).mkString(","), targetState, state)) + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala new file mode 100644 index 0000000..d5486cf --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import kafka.common.TopicAndPartition +import kafka.utils.nonthreadsafe + +/** + * Consumer metadata contains the following metadata: + * + * Heartbeat metadata: + * 1. negotiated heartbeat session timeout + * 2. timestamp of the latest heartbeat + * + * Subscription metadata: + * 1. subscribed topics + * 2. assigned partitions for the subscribed topics + * + * In addition, it also contains the following state information: + * + * 1. Awaiting rebalance callback: when the consumer group is in the prepare-rebalance state, + * its rebalance callback will be kept in the metadata if the + * consumer has sent the join group request + */ +@nonthreadsafe +private[coordinator] class ConsumerMetadata(val consumerId: String, + val groupId: String, + var topics: Set[String], + val sessionTimeoutMs: Int) { + + var awaitingRebalanceCallback: (Set[TopicAndPartition], String, Int, Short) => Unit = null + var assignedTopicPartitions = Set.empty[TopicAndPartition] + var latestHeartbeat: Long = -1 +} diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala index 88e82b6..c39e6de 100644 --- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala @@ -34,7 +34,7 @@ import scala.collection.mutable @threadsafe private[coordinator] class CoordinatorMetadata(config: KafkaConfig, zkClient: ZkClient, - maybePrepareRebalance: Group => Unit) { + maybePrepareRebalance: ConsumerGroupMetadata => Unit) { /** * NOTE: If a group lock and coordinatorLock are simultaneously needed, @@ -45,7 +45,7 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig, /** * These should be guarded by metadataLock */ - private val groups = new mutable.HashMap[String, Group] + private val groups = new mutable.HashMap[String, ConsumerGroupMetadata] private val groupsPerTopic = new mutable.HashMap[String, Set[String]] private val topicPartitionCounts = new mutable.HashMap[String, Int] private val topicPartitionChangeListeners = new mutable.HashMap[String, TopicPartitionChangeListener] @@ -80,7 +80,7 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig, */ def addGroup(groupId: String, partitionAssignmentStrategy: String) = { inWriteLock(metadataLock) { - groups.getOrElseUpdate(groupId, new Group(groupId, partitionAssignmentStrategy)) + groups.getOrElseUpdate(groupId, new ConsumerGroupMetadata(groupId, partitionAssignmentStrategy)) } } @@ -195,7 +195,7 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig, topicPartitionCounts.put(topic, numPartitions) groupsPerTopic(topic).map(groupId => groups(groupId)) } - else Set.empty[Group] + else Set.empty[ConsumerGroupMetadata] } groupsToRebalance.foreach(maybePrepareRebalance) } @@ -212,7 +212,7 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig, topicPartitionCounts.put(topic, 0) groupsPerTopic(topic).map(groupId => groups(groupId)) } - else Set.empty[Group] + else Set.empty[ConsumerGroupMetadata] } groupsToRebalance.foreach(maybePrepareRebalance) } diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala index b3360cc..70a710c 100644 --- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala +++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala @@ -24,8 +24,8 @@ import kafka.server.DelayedOperation * Heartbeats are paused during rebalance. */ private[coordinator] class DelayedHeartbeat(consumerCoordinator: ConsumerCoordinator, - group: Group, - consumer: Consumer, + group: ConsumerGroupMetadata, + consumer: ConsumerMetadata, heartbeatDeadline: Long, sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala deleted file mode 100644 index 8f57d38..0000000 --- a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.coordinator - -import kafka.common.TopicAndPartition -import kafka.server.DelayedOperation - -/** - * Delayed join-group operations that are kept in the purgatory before the partition assignment completed - * - * These operation should never expire; when the rebalance has completed, all consumer's - * join-group operations will be completed by sending back the response with the - * calculated partition assignment. - */ -private[coordinator] class DelayedJoinGroup(consumerCoordinator: ConsumerCoordinator, - group: Group, - consumer: Consumer, - sessionTimeout: Long, - responseCallback:(Set[TopicAndPartition], String, Int, Short) => Unit) - extends DelayedOperation(sessionTimeout) { - override def tryComplete(): Boolean = consumerCoordinator.tryCompleteJoinGroup(group, forceComplete) - override def onExpiration() = consumerCoordinator.onExpirationJoinGroup() - override def onComplete() = consumerCoordinator.onCompleteJoinGroup(group, consumer, responseCallback) -} diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala index 689621c..8247d33 100644 --- a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala +++ b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala @@ -30,7 +30,7 @@ import kafka.server.DelayedOperation * the rest of the group. */ private[coordinator] class DelayedRebalance(consumerCoordinator: ConsumerCoordinator, - group: Group, + group: ConsumerGroupMetadata, sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { diff --git a/core/src/main/scala/kafka/coordinator/Group.scala b/core/src/main/scala/kafka/coordinator/Group.scala deleted file mode 100644 index 048eeee..0000000 --- a/core/src/main/scala/kafka/coordinator/Group.scala +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.coordinator - -import kafka.utils.nonthreadsafe - -import java.util.UUID - -import collection.mutable - -private[coordinator] sealed trait GroupState { def state: Byte } - -/** - * Consumer group is preparing to rebalance - * - * action: respond to heartbeats with an ILLEGAL GENERATION error code - * transition: some consumers have joined by the timeout => Rebalancing - * all consumers have left the group => Dead - */ -private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 } - -/** - * Consumer group is rebalancing - * - * action: compute the group's partition assignment - * send the join-group response with new partition assignment when rebalance is complete - * transition: partition assignment has been computed => Stable - */ -private[coordinator] case object Rebalancing extends GroupState { val state: Byte = 2 } - -/** - * Consumer group is stable - * - * action: respond to consumer heartbeats normally - * transition: consumer failure detected via heartbeat => PreparingRebalance - * consumer join-group received => PreparingRebalance - * zookeeper topic watcher fired => PreparingRebalance - */ -private[coordinator] case object Stable extends GroupState { val state: Byte = 3 } - -/** - * Consumer group has no more members - * - * action: none - * transition: none - */ -private[coordinator] case object Dead extends GroupState { val state: Byte = 4 } - - -/** - * A group contains the following metadata: - * - * Membership metadata: - * 1. Consumers registered in this group - * 2. Partition assignment strategy for this group - * - * State metadata: - * 1. group state - * 2. generation id - */ -@nonthreadsafe -private[coordinator] class Group(val groupId: String, - val partitionAssignmentStrategy: String) { - - private val validPreviousStates: Map[GroupState, Set[GroupState]] = - Map(Dead -> Set(PreparingRebalance), - Stable -> Set(Rebalancing), - PreparingRebalance -> Set(Stable), - Rebalancing -> Set(PreparingRebalance)) - - private val consumers = new mutable.HashMap[String, Consumer] - private var state: GroupState = Stable - var generationId = 0 - - def is(groupState: GroupState) = state == groupState - def has(consumerId: String) = consumers.contains(consumerId) - def get(consumerId: String) = consumers(consumerId) - - def add(consumerId: String, consumer: Consumer) { - consumers.put(consumerId, consumer) - } - - def remove(consumerId: String) { - consumers.remove(consumerId) - } - - def isEmpty = consumers.isEmpty - - def topicsPerConsumer = consumers.mapValues(_.topics).toMap - - def topics = consumers.values.flatMap(_.topics).toSet - - def allConsumersRejoined = consumers.values.forall(_.awaitingRebalance) - - def notYetRejoinedConsumers = consumers.values.filter(!_.awaitingRebalance).toList - - def rebalanceTimeout = consumers.values.foldLeft(0) {(timeout, consumer) => - timeout.max(consumer.sessionTimeoutMs) - } - - // TODO: decide if ids should be predictable or random - def generateNextConsumerId = UUID.randomUUID().toString - - def canRebalance = state == Stable - - def transitionTo(groupState: GroupState) { - assertValidTransition(groupState) - state = groupState - } - - private def assertValidTransition(targetState: GroupState) { - if (!validPreviousStates(targetState).contains(state)) - throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state" - .format(groupId, validPreviousStates(targetState).mkString(","), targetState, state)) - } -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala b/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala deleted file mode 100644 index b6b9f5f..0000000 --- a/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.coordinator - -import kafka.common.TopicAndPartition -import kafka.utils.nonthreadsafe - -/** - * A consumer contains the following metadata: - * - * Heartbeat metadata: - * 1. negotiated heartbeat session timeout - * 2. timestamp of the latest heartbeat - * - * Subscription metadata: - * 1. subscribed topics - * 2. assigned partitions for the subscribed topics - */ -@nonthreadsafe -private[coordinator] class Consumer(val consumerId: String, - val groupId: String, - var topics: Set[String], - val sessionTimeoutMs: Int) { - - var awaitingRebalance = false - var assignedTopicPartitions = Set.empty[TopicAndPartition] - var latestHeartbeat: Long = -1 -} diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala new file mode 100644 index 0000000..b69c993 --- /dev/null +++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import junit.framework.Assert._ +import org.junit.{Before, Test} +import org.scalatest.junit.JUnitSuite + +/** + * Test group state transitions + */ +class ConsumerGroupMetadataTest extends JUnitSuite { + var group: ConsumerGroupMetadata = null + + @Before + def setUp() { + group = new ConsumerGroupMetadata("test", "range") + } + + @Test + def testCanRebalanceWhenStable() { + assertTrue(group.canRebalance) + } + + @Test + def testCannotRebalanceWhenPreparingRebalance() { + group.transitionTo(PreparingRebalance) + assertFalse(group.canRebalance) + } + + @Test + def testCannotRebalanceWhenRebalancing() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Rebalancing) + assertFalse(group.canRebalance) + } + + @Test + def testCannotRebalanceWhenDead() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Dead) + assertFalse(group.canRebalance) + } + + @Test + def testStableToPreparingRebalanceTransition() { + group.transitionTo(PreparingRebalance) + assertState(group, PreparingRebalance) + } + + @Test + def testPreparingRebalanceToRebalancingTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Rebalancing) + assertState(group, Rebalancing) + } + + @Test + def testPreparingRebalanceToDeadTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Dead) + assertState(group, Dead) + } + + @Test + def testRebalancingToStableTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Rebalancing) + group.transitionTo(Stable) + assertState(group, Stable) + } + + @Test(expected = classOf[IllegalStateException]) + def testStableToStableIllegalTransition() { + group.transitionTo(Stable) + } + + @Test(expected = classOf[IllegalStateException]) + def testStableToRebalancingIllegalTransition() { + group.transitionTo(Rebalancing) + } + + @Test(expected = classOf[IllegalStateException]) + def testStableToDeadIllegalTransition() { + group.transitionTo(Dead) + } + + @Test(expected = classOf[IllegalStateException]) + def testPreparingRebalanceToPreparingRebalanceIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(PreparingRebalance) + } + + @Test(expected = classOf[IllegalStateException]) + def testPreparingRebalanceToStableIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Stable) + } + + @Test(expected = classOf[IllegalStateException]) + def testRebalancingToRebalancingIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Rebalancing) + group.transitionTo(Rebalancing) + } + + @Test(expected = classOf[IllegalStateException]) + def testRebalancingToPreparingRebalanceTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Rebalancing) + group.transitionTo(PreparingRebalance) + } + + @Test(expected = classOf[IllegalStateException]) + def testRebalancingToDeadIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Rebalancing) + group.transitionTo(Dead) + } + + @Test(expected = classOf[IllegalStateException]) + def testDeadToDeadIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Dead) + group.transitionTo(Dead) + } + + @Test(expected = classOf[IllegalStateException]) + def testDeadToStableIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Dead) + group.transitionTo(Stable) + } + + @Test(expected = classOf[IllegalStateException]) + def testDeadToPreparingRebalanceIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Dead) + group.transitionTo(PreparingRebalance) + } + + @Test(expected = classOf[IllegalStateException]) + def testDeadToRebalancingIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Dead) + group.transitionTo(Rebalancing) + } + + private def assertState(group: ConsumerGroupMetadata, targetState: GroupState) { + val states: Set[GroupState] = Set(Stable, PreparingRebalance, Rebalancing, Dead) + val otherStates = states - targetState + otherStates.foreach { otherState => + assertFalse(group.is(otherState)) + } + assertTrue(group.is(targetState)) + } +} diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupTest.scala deleted file mode 100644 index 6561a1d..0000000 --- a/core/src/test/scala/unit/kafka/coordinator/GroupTest.scala +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.coordinator - -import junit.framework.Assert._ -import org.junit.{Before, Test} -import org.scalatest.junit.JUnitSuite - -/** - * Test group state transitions - */ -class GroupTest extends JUnitSuite { - var group: Group = null - - @Before - def setUp() { - group = new Group("test", "range") - } - - @Test - def testCanRebalanceWhenStable() { - assertTrue(group.canRebalance) - } - - @Test - def testCannotRebalanceWhenPreparingRebalance() { - group.transitionTo(PreparingRebalance) - assertFalse(group.canRebalance) - } - - @Test - def testCannotRebalanceWhenRebalancing() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Rebalancing) - assertFalse(group.canRebalance) - } - - @Test - def testCannotRebalanceWhenDead() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Dead) - assertFalse(group.canRebalance) - } - - @Test - def testStableToPreparingRebalanceTransition() { - group.transitionTo(PreparingRebalance) - assertState(group, PreparingRebalance) - } - - @Test - def testPreparingRebalanceToRebalancingTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Rebalancing) - assertState(group, Rebalancing) - } - - @Test - def testPreparingRebalanceToDeadTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Dead) - assertState(group, Dead) - } - - @Test - def testRebalancingToStableTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Rebalancing) - group.transitionTo(Stable) - assertState(group, Stable) - } - - @Test(expected = classOf[IllegalStateException]) - def testStableToStableIllegalTransition() { - group.transitionTo(Stable) - } - - @Test(expected = classOf[IllegalStateException]) - def testStableToRebalancingIllegalTransition() { - group.transitionTo(Rebalancing) - } - - @Test(expected = classOf[IllegalStateException]) - def testStableToDeadIllegalTransition() { - group.transitionTo(Dead) - } - - @Test(expected = classOf[IllegalStateException]) - def testPreparingRebalanceToPreparingRebalanceIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(PreparingRebalance) - } - - @Test(expected = classOf[IllegalStateException]) - def testPreparingRebalanceToStableIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Stable) - } - - @Test(expected = classOf[IllegalStateException]) - def testRebalancingToRebalancingIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Rebalancing) - group.transitionTo(Rebalancing) - } - - @Test(expected = classOf[IllegalStateException]) - def testRebalancingToPreparingRebalanceTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Rebalancing) - group.transitionTo(PreparingRebalance) - } - - @Test(expected = classOf[IllegalStateException]) - def testRebalancingToDeadIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Rebalancing) - group.transitionTo(Dead) - } - - @Test(expected = classOf[IllegalStateException]) - def testDeadToDeadIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Dead) - group.transitionTo(Dead) - } - - @Test(expected = classOf[IllegalStateException]) - def testDeadToStableIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Dead) - group.transitionTo(Stable) - } - - @Test(expected = classOf[IllegalStateException]) - def testDeadToPreparingRebalanceIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Dead) - group.transitionTo(PreparingRebalance) - } - - @Test(expected = classOf[IllegalStateException]) - def testDeadToRebalancingIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Dead) - group.transitionTo(Rebalancing) - } - - private def assertState(group: Group, targetState: GroupState) { - val states: Set[GroupState] = Set(Stable, PreparingRebalance, Rebalancing, Dead) - val otherStates = states - targetState - otherStates.foreach { otherState => - assertFalse(group.is(otherState)) - } - assertTrue(group.is(targetState)) - } -} -- 1.7.12.4