From 67dd5b25a2d0196534c0bb92bea5f763857dcec7 Mon Sep 17 00:00:00 2001 From: Pierre-Yves Ritschard Date: Fri, 27 Jan 2012 22:31:48 +0100 Subject: [PATCH] Incorporate Jun Rao's comments on KAFKA-251 --- .../consumer/ZookeeperConsumerConnector.scala | 120 +++++++++++--------- 1 files changed, 65 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 7b9804f..15c18aa 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -81,9 +81,37 @@ trait ZookeeperConsumerConnectorMBean { def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long } + +trait ZookeeperConsumerTopicPartitionMBean { + def getGroupId: String + def getPartition: String + def getTopic: String + def getFetchOffset: Long + def getConsumeOffset: Long + def getConsumeLag: Long +} + +private[kafka] class ZookeeperConsumerTopicPartition(val topic: String, + val info: PartitionTopicInfo, + val groupId: String) + extends ZookeeperConsumerTopicPartitionMBean { + + def getGroupId: String = groupId + + def getTopic: String = topic + + def getPartition: String = info.partition.name + + def getFetchOffset: Long = info.getFetchOffset + + def getConsumeOffset: Long = info.getConsumeOffset + + def getConsumeLag: Long = info.getFetchOffset - info.getConsumeOffset +} + private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val enableFetcher: Boolean) // for testing only - extends ConsumerConnector with ZookeeperConsumerConnectorMBean with Logging { + extends ConsumerConnector with Logging { private val isShuttingDown = new AtomicBoolean(false) private val rebalanceLock = new Object @@ -252,59 +280,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - // for JMX - def getPartOwnerStats(): String = { - val builder = new StringBuilder - for ((topic, infos) <- topicRegistry) { - builder.append("\n" + topic + ": [") - val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) - for(partition <- infos.values) { - builder.append("\n {") - builder.append{partition.partition.name} - builder.append(",fetch offset:" + partition.getFetchOffset) - builder.append(",consumer offset:" + partition.getConsumeOffset) - builder.append("}") - } - builder.append("\n ]") - } - builder.toString - } - - // for JMX - def getConsumerGroup(): String = config.groupId - - def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long = - getLatestOffset(topic, brokerId, partitionId) - getConsumedOffset(topic, brokerId, partitionId) - - def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long = { - val partition = new Partition(brokerId, partitionId) - val partitionInfos = topicRegistry.get(topic) - if (partitionInfos != null) { - val partitionInfo = partitionInfos.get(partition) - if (partitionInfo != null) - return partitionInfo.getConsumeOffset - } - - //otherwise, try to get it from zookeeper - try { - val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) - val znode = topicDirs.consumerOffsetDir + "/" + partition.name - val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode) - if (offsetString != null) - return offsetString.toLong - else - return -1 - } - catch { - case e => - error("error in getConsumedOffset JMX ", e) - } - return -2 - } - - def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long = - earliestOrLatestOffset(topic, brokerId, partitionId, OffsetRequest.LatestTime) - private def earliestOrLatestOffset(topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long): Long = { var simpleConsumer: SimpleConsumer = null var producedOffset: Long = -1L @@ -427,7 +402,40 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, oldPartitionsPerTopicMap.clear } + + private def getMBeanName(topic: String, partition: String, groupId: String) : String = { + String.format("kafka:type=kafka.ConsumerPartitionTopicStats,topic=%s,partition=%s,groupid=%s", + topic, + partition, + groupId) + } + + def registerMBeans() { + for ((topic, infos) <- topicRegistry) { + for (partinfo <- infos.values) { + val mbeanName = getMBeanName(topic, partinfo.partition.name, config.groupId) + val consumerTopicMBean = new ZookeeperConsumerTopicPartition(topic, partinfo, config.groupId) + + info("registering MBean" + mbeanName) + Utils.registerMBean(consumerTopicMBean, mbeanName) + } + } + info("done") + } + + def unregisterMBeans() { + for ((topic, infos) <- topicRegistry) { + for (partinfo <- infos.values) { + val mbeanName = getMBeanName(topic, partinfo.partition.name, config.groupId) + + info("unregistering MBean") + Utils.unregisterMBean(mbeanName) + } + } + } + def syncedRebalance() { + unregisterMBeans() rebalanceLock synchronized { for (i <- 0 until config.maxRebalanceRetries) { info("begin rebalancing consumer " + consumerIdString + " try #" + i) @@ -443,8 +451,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, info("exception during rebalance ", e) } info("end rebalancing consumer " + consumerIdString + " try #" + i) - if (done) + if (done) { + registerMBeans() return + } // release all partitions, reset state and retry releasePartitionOwnership() Thread.sleep(config.rebalanceBackoffMs) -- 1.7.5.4