From bd66dff2a50e3c25010d1f64a9f924cad04e94e0 Mon Sep 17 00:00:00 2001 From: mgharat Date: Mon, 15 Sep 2014 15:12:41 -0700 Subject: [PATCH 1/2] SRE Offset tool + Offset Client --- config/consumer.properties | 3 + core/src/main/scala/kafka/tools/OffsetClient.scala | 270 +++++++++++++++++++++ .../src/main/scala/kafka/tools/SreOffsetTool.scala | 128 ++++++++++ 3 files changed, 401 insertions(+) create mode 100644 core/src/main/scala/kafka/tools/OffsetClient.scala create mode 100644 core/src/main/scala/kafka/tools/SreOffsetTool.scala diff --git a/config/consumer.properties b/config/consumer.properties index 83847de..2fc7fb7 100644 --- a/config/consumer.properties +++ b/config/consumer.properties @@ -27,3 +27,6 @@ group.id=test-consumer-group #consumer timeout #consumer.timeout.ms=5000 + +#offsetStorage +offsets.storage=kafka diff --git a/core/src/main/scala/kafka/tools/OffsetClient.scala b/core/src/main/scala/kafka/tools/OffsetClient.scala new file mode 100644 index 0000000..c62a31c --- /dev/null +++ b/core/src/main/scala/kafka/tools/OffsetClient.scala @@ -0,0 +1,270 @@ +package kafka.tools + +import java.io.IOException +import java.util.concurrent.TimeUnit + +import kafka.api._ +import kafka.common.{OffsetAndMetadata, TopicAndPartition, ErrorMapping} +import kafka.consumer.ConsumerConfig +import kafka.network.BlockingChannel +import kafka.utils.Logging +import org.I0Itec.zkclient.ZkClient +import kafka.cluster.{Broker, Cluster} + +import scala.util.Random + + +/** + * Created by mgharat on 9/11/14. + */ +object OffsetClient extends Logging { + + private def getQueryChannel(config: ConsumerConfig, brokers: Seq[Broker]): BlockingChannel = { + var channel: BlockingChannel = null + var connected = false + // val shuffledBrokers = Random.shuffle(brokers) + var i: Int = 0 + + while ( /*i < shuffledBrokers.size &&*/ !connected) { + // val broker = shuffledBrokers(i) + // i = i + 1 + val shuffledBrokers = Random.shuffle(brokers) + val broker = shuffledBrokers(0) + trace("Connecting to the broker %s:%d.".format(broker.host, broker.port)) + try { + channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, config.socketTimeoutMs) + debug("Created channel to broker %s:%d.".format(channel.host, channel.port)) + true + } + catch { + case e: Exception => + if (channel != null) channel.disconnect() + info("Error while creating channel to %s:%d.".format(broker.host, broker.port)) + false + } + connected = if (channel == null) false else true + } + channel.connect() + + channel + + } + + private def getOffsetManagerChannel(config: ConsumerConfig, /*brokerMap: collection.Map[Int, Broker]*/ brokers: Seq[Broker]): Option[BlockingChannel] = { + var offSetManagerChannel: BlockingChannel = null + + var queryChannel = getQueryChannel(config, /*brokerMap.values.toSeq*/ brokers) + + var offSetManagerChannelOpt: Option[BlockingChannel] = None + + while (!offSetManagerChannelOpt.isDefined) { + + var coordinatorOpt: Option[Broker] = None + + while (!coordinatorOpt.isDefined) { + try { + if (!queryChannel.isConnected) { + queryChannel = getQueryChannel(config, /*brokerMap.values.toSeq*/ brokers) + } + debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, config.groupId)) + queryChannel.send(ConsumerMetadataRequest(config.groupId)) + val response = queryChannel.receive() + val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) + debug("Consumer metadata response: " + consumerMetadataResponse.toString) + if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) { + coordinatorOpt = consumerMetadataResponse.coordinatorOpt + } + else { + debug("Query to %s:%d to locate offset manager for %s failed - will retry in % milliseconds." + .format(queryChannel.host, queryChannel.port, config.groupId, config.offsetsChannelBackoffMs)) + Thread.sleep(config.offsetsChannelBackoffMs) + } + } + catch { + case ioe: IOException => + info("Failed to fetch consumer metadata from %s:%d.".format(queryChannel.host, queryChannel.port)) + queryChannel.disconnect() + } + } + + val coordinator = coordinatorOpt.get + if (coordinator.host == queryChannel.host && coordinator.port == queryChannel.port) { + offSetManagerChannelOpt = Some(queryChannel) + } + else { + val connectString = "%s:%d".format(coordinator.host, coordinator.port) + try { + offSetManagerChannel = new BlockingChannel(coordinator.host, coordinator.port, BlockingChannel.UseDefaultBufferSize, + BlockingChannel.UseDefaultBufferSize, config.socketTimeoutMs) + offSetManagerChannel.connect() + offSetManagerChannelOpt = Some(offSetManagerChannel) + queryChannel.disconnect() + } + catch { + case ioe: IOException => + info("Error while connecting to %s.".format(connectString)) + if (offSetManagerChannel != null) offSetManagerChannel.disconnect() + TimeUnit.MILLISECONDS.sleep(config.offsetsChannelBackoffMs) + offSetManagerChannelOpt = None + } + } + } + + offSetManagerChannelOpt + } + + private def fetchTopicMetadata(config: ConsumerConfig, brokers: Seq[Broker], topics: Seq[String]): TopicMetadataResponse = { + var topicAndPartitions = List[TopicAndPartition]() + var fetchMetadataSucceded = false + var i: Int = 0 + var topicMetadataChannel = getQueryChannel(config, brokers) + var topicMetaDataResponse: TopicMetadataResponse = null + + while (!fetchMetadataSucceded) { + try { + if (!topicMetadataChannel.isConnected) { + topicMetadataChannel = getQueryChannel(config, brokers) + } + info("Fetching metadata from broker %s:%d".format(topicMetadataChannel.host, topicMetadataChannel.port)) + val topicMetaDataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics) + topicMetadataChannel.send(topicMetaDataRequest) + topicMetaDataResponse = TopicMetadataResponse.readFrom(topicMetadataChannel.receive().buffer) + fetchMetadataSucceded = true + } + catch { + case e: Exception => + warn("Fetching metadata for topics [%s] from broker [%s:%d] failed".format(topics, topicMetadataChannel.host, topicMetadataChannel.port)) + topicMetadataChannel.disconnect() + } + } + + topicMetaDataResponse + } + + + def fetch(config: ConsumerConfig, /*brokerConfig: Seq[BrokerConfig]*/ brokers: Seq[Broker], topics: Seq[String]): Option[OffsetFetchResponse] = { + // val brokers = brokerConfig.map(config => new Broker(config.brokerId, config.hostName, config.port)) + var topicAndPartition = collection.Seq[TopicAndPartition]() + + if (topics.size > 0) { + val topicMetaDataResponse: TopicMetadataResponse = fetchTopicMetadata(config, brokers, topics) + topicAndPartition = topicMetaDataResponse.topicsMetadata.flatMap(topicMetaData => { + val topic = topicMetaData.topic + val topicPartitions = topicMetaData.partitionsMetadata.map(partitionMetadata => { + new TopicAndPartition(topic, partitionMetadata.partitionId) + } + ) + topicPartitions + } + ) + } + + val offsetFetchRequest = OffsetFetchRequest(groupId = config.groupId, requestInfo = topicAndPartition, clientId = config.clientId) + var offsetFetchResponseOpt: Option[OffsetFetchResponse] = None + while (!offsetFetchResponseOpt.isDefined) { + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers).get + try { + offsetManagerChannel.send(offsetFetchRequest) + val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetManagerChannel.receive().buffer) + trace("Offset fetch response: %s.".format(offsetFetchResponse)) + + val (leaderChanged, loadInProgress) = + offsetFetchResponse.requestInfo.foldLeft(false, false) { case (folded, (topicPartition, offsetMetadataAndError)) => + (folded._1 || (offsetMetadataAndError.error == ErrorMapping.NotCoordinatorForConsumerCode), + folded._2 || (offsetMetadataAndError.error == ErrorMapping.OffsetsLoadInProgressCode)) + } + + if (leaderChanged) { + offsetManagerChannel.disconnect() + debug("Could not fetch offsets (because offset manager has moved).") + } + else if (loadInProgress) { + debug("Could not fetch offsets (because offset cache is being loaded).") + } + else { + offsetFetchResponseOpt = Some(offsetFetchResponse) + } + } + catch { + case e: Exception => + warn("Error while fetching offsets from %s:%d. Possible cause: %s".format(offsetManagerChannel.host, offsetManagerChannel.port, e.getMessage)) + } + + if (offsetFetchResponseOpt.isEmpty) { + debug("Retrying offset fetch in %d ms".format(config.offsetsChannelBackoffMs)) + TimeUnit.MILLISECONDS.sleep(config.offsetsChannelBackoffMs) + } + } + + offsetFetchResponseOpt + } + + def commit(config: ConsumerConfig, brokers: Seq[Broker], offsetsToCommit: Map[TopicAndPartition, OffsetAndMetadata]): Boolean = { + var done = false + var retriesRemaining = 1 + config.offsetsCommitMaxRetries + var committed = false + + if (offsetsToCommit.size > 0) { + val offsetCommitRequest = OffsetCommitRequest(config.groupId, offsetsToCommit, clientId = config.clientId) + + while (!done) { + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers).get + try { + offsetManagerChannel.send(offsetCommitRequest) + val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetManagerChannel.receive().buffer) + trace("Offset commit response: %s.".format(offsetCommitResponse)) + + val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { + offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) => + + (folded._1 || //update commitFailed + errorCode != ErrorMapping.NoError, + + folded._2 || //update retryableIfFailed - (only metadata too large is not retryable) + (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode), + + folded._3 || //update shouldRefreshCoordinator + errorCode == ErrorMapping.NotCoordinatorForConsumerCode || + errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode, + + //update error count + folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0)) + } + } + debug(errorCount + " errors in offset commit response.") + + if (shouldRefreshCoordinator) { + debug("Could not commit offsets (because offset coordinator has moved or is unavailable).") + offsetManagerChannel.disconnect() + } + + if (commitFailed && retryableIfFailed) { + committed = false + } + else { + committed = true + } + } + catch { + case t: Throwable => + error("Error while commiting offsets.", t) + offsetManagerChannel.disconnect() + } + + done = { + retriesRemaining -= 1 + retriesRemaining == 0 || committed + } + + if (!done) { + debug("Retrying offset commit in %d ms".format(config.offsetsChannelBackoffMs)) + TimeUnit.MILLISECONDS.sleep(config.offsetsChannelBackoffMs) + } + } + } + + committed + } +} + + diff --git a/core/src/main/scala/kafka/tools/SreOffsetTool.scala b/core/src/main/scala/kafka/tools/SreOffsetTool.scala new file mode 100644 index 0000000..ccc524d --- /dev/null +++ b/core/src/main/scala/kafka/tools/SreOffsetTool.scala @@ -0,0 +1,128 @@ +package kafka.tools + +import java.io.{FileWriter, BufferedWriter, File} +import joptsimple.OptionParser +import kafka.api.{OffsetCommitRequest, OffsetFetchResponse} +import kafka.cluster.Broker +import kafka.common.{OffsetAndMetadata, TopicAndPartition} +import kafka.consumer.ConsumerConfig +import kafka.utils.{Utils, Logging} + +import scala.io.Source + + +/** + * Created by mgharat on 9/10/14. + */ +object SreOffsetTool extends Logging { + + def main(args: Array[String]): Unit = { + info("Starting SreOffsetTool") + + val parser = new OptionParser + + val topicList = parser.accepts("topics", "Topics for which offsets/Partitions should be fetched") + .withRequiredArg() + .ofType(classOf[String]) + + val brokerList = parser.accepts("broker-list", "List of brokers") + .withRequiredArg() + .ofType(classOf[String]) + + val filePathArg = parser.accepts("filepath", "Path to the read/write file") + .withRequiredArg() + .ofType(classOf[String]) + + val readWrite = parser.accepts("readWrite", "Read write flag, 0:Read, 1:Write") + .withRequiredArg() + .ofType(classOf[Integer]) + + val consumerConfigFile = parser.accepts("config-file", "Consumer config file") + .withRequiredArg() + .ofType(classOf[String]) + + + val options = parser.parse(args: _*) + + val hostAndPort = options.valueOf(brokerList).split(",") + + val brokers = hostAndPort.map(hostPort => { + val temp = hostPort.split(":") + val host = temp(0) + val port = temp(1).toInt + new Broker(Math.random().toInt, host, port) + }) + + val topics = options.valueOf(topicList).split(",").toSeq + + val filePath = options.valueOf(filePathArg) + + //Consumer Config properties + val configFile = options.valueOf(consumerConfigFile) + val consumerConfig = new ConsumerConfig(Utils.loadProps(configFile)) + //("/Users/mgharat/kafka/config/consumer.properties")) + + val rwFlag = options.valueOf(readWrite) + + val offSetTool = new OffsetTool(consumerConfig, brokers) + offSetTool.readWrite(rwFlag, filePath, topics) + println("rwFlag : " + rwFlag) + } + + /** + * + * @param config Consumer Configuration + * @param brokers Brokers in the cluster + */ + private class OffsetTool(val config: ConsumerConfig, brokers: Seq[Broker]) { + + /** + * + * @param rwFlag 0 = read, 1 = write + * @param fileName File Path to file which to write to or read from + * @param topics list of topics whose partitions, offsets to fetch + */ + def readWrite(rwFlag: Int, fileName: String, topics: Seq[String] = null) { + rwFlag match { + //Read from OffsetManager and write to a file + case 0 => + val file = new File(fileName) + var offSetFetchResponseOpt: Option[OffsetFetchResponse] = None + + while (!offSetFetchResponseOpt.isDefined) { + offSetFetchResponseOpt = OffsetClient.fetch(config, brokers, topics) + } + val offsetFetchResonse = offSetFetchResponseOpt.get + offsetFetchResonse.requestInfo.foreach(x => println((x._1).topic + " : " + (x._1).partition + ": " + (x._2).offset)) + val topicPartitionOffsetMap = offsetFetchResonse.requestInfo + + //Writing to file + val writer = new BufferedWriter(new FileWriter(file)) + topicPartitionOffsetMap.foreach(tpo => { + val topic = tpo._1.topic + val partition = tpo._1.partition + val offset = tpo._2.offset + writer.write(topic + "/" + partition + "/" + offset) + writer.newLine() + }) + writer.flush() + writer.close() + + //Read from the file and write to the OffsetManager + case 1 => + //Reading from a file line by line. Each line is of the form topic/partition/offset + val fileLines = Source.fromFile(fileName).getLines().toList + val offsetsToCommit = fileLines.map(line => { + val topicPartitionOffset = line.split("/") + val topicAndPartition = new TopicAndPartition(topicPartitionOffset(0), topicPartitionOffset(1).toInt) + val offsetMetadata = new OffsetAndMetadata(topicPartitionOffset(2).toInt) + println(topicAndPartition + "->" + offsetMetadata) + (topicAndPartition -> offsetMetadata) + }).toMap + + println(OffsetClient.commit(config, brokers, offsetsToCommit)) + } + } + } + +} -- 1.9.3 (Apple Git-50) From 1e23aebb1f268947977470e01f0e4c495c41c337 Mon Sep 17 00:00:00 2001 From: mgharat Date: Mon, 22 Sep 2014 21:47:15 -0700 Subject: [PATCH 2/2] OffsetCLient Tool API. ImportZkOffsets and ExportZkOffsets replaced by ImportOffsets and ExportOffsets --- .../consumer/ZookeeperConsumerConnector.scala | 339 +++++++++------------ .../main/scala/kafka/tools/ConfigConstants.scala | 11 + .../src/main/scala/kafka/tools/ExportOffsets.scala | 195 ++++++++++++ .../src/main/scala/kafka/tools/ImportOffsets.scala | 187 ++++++++++++ core/src/main/scala/kafka/tools/OffsetClient.scala | 158 +++++----- 5 files changed, 616 insertions(+), 274 deletions(-) create mode 100644 core/src/main/scala/kafka/tools/ConfigConstants.scala create mode 100644 core/src/main/scala/kafka/tools/ExportOffsets.scala create mode 100644 core/src/main/scala/kafka/tools/ImportOffsets.scala diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index fbc680f..2248277 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -31,6 +31,7 @@ import kafka.common._ import kafka.metrics._ import kafka.network.BlockingChannel import kafka.serializer._ +import kafka.tools.OffsetClient import kafka.utils.Utils.inLock import kafka.utils.ZkUtils._ import kafka.utils._ @@ -58,8 +59,8 @@ import scala.collection._ * * 2. Broker node registry: * /brokers/[0...N] --> { "host" : "host:port", - * "topics" : {"topic1": ["partition1" ... "partitionN"], ..., - * "topicN": ["partition1" ... "partitionN"] } } + * "topics" : {"topic1": ["partition1" ... "partitionN"], ..., + * "topicN": ["partition1" ... "partitionN"] } } * This is a list of all present broker brokers. A unique logical node id is configured on each broker node. A broker * node registers itself on start-up and creates a znode with the logical node id under /brokers. The value of the znode * is a JSON String that contains (1) the host name and the port the broker is listening to, (2) a list of topics that @@ -82,7 +83,7 @@ private[kafka] object ZookeeperConsumerConnector { private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val enableFetcher: Boolean) // for testing only - extends ConsumerConnector with Logging with KafkaMetricsGroup { + extends ConsumerConnector with Logging with KafkaMetricsGroup { private val isShuttingDown = new AtomicBoolean(false) private val rebalanceLock = new Object @@ -109,15 +110,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) val consumerIdString = { - var consumerUuid : String = null + var consumerUuid: String = null config.consumerId match { case Some(consumerId) // for testing only => consumerUuid = consumerId case None // generate unique consumerId automatically => val uuid = UUID.randomUUID() - consumerUuid = "%s-%d-%s".format( - InetAddress.getLocalHost.getHostName, System.currentTimeMillis, - uuid.getMostSignificantBits().toHexString.substring(0,8)) + consumerUuid = "%s-%d-%s".format( + InetAddress.getLocalHost.getHostName, System.currentTimeMillis, + uuid.getMostSignificantBits().toHexString.substring(0, 8)) } config.groupId + "_" + consumerUuid } @@ -131,32 +132,32 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, scheduler.startup info("starting auto committer every " + config.autoCommitIntervalMs + " ms") scheduler.schedule("kafka-consumer-autocommit", - autoCommit, - delay = config.autoCommitIntervalMs, - period = config.autoCommitIntervalMs, - unit = TimeUnit.MILLISECONDS) + autoCommit, + delay = config.autoCommitIntervalMs, + period = config.autoCommitIntervalMs, + unit = TimeUnit.MILLISECONDS) } KafkaMetricsReporter.startReporters(config.props) def this(config: ConsumerConfig) = this(config, true) - def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]] = + def createMessageStreams(topicCountMap: Map[String, Int]): Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]] = createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder()) - def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) - : Map[String, List[KafkaStream[K,V]]] = { + def createMessageStreams[K, V](topicCountMap: Map[String, Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) + : Map[String, List[KafkaStream[K, V]]] = { if (messageStreamCreated.getAndSet(true)) throw new MessageStreamsExistException(this.getClass.getSimpleName + - " can create message streams at most once",null) + " can create message streams at most once", null) consume(topicCountMap, keyDecoder, valueDecoder) } - def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, - numStreams: Int, - keyDecoder: Decoder[K] = new DefaultDecoder(), - valueDecoder: Decoder[V] = new DefaultDecoder()) = { - val wildcardStreamsHandler = new WildcardStreamsHandler[K,V](topicFilter, numStreams, keyDecoder, valueDecoder) + def createMessageStreamsByFilter[K, V](topicFilter: TopicFilter, + numStreams: Int, + keyDecoder: Decoder[K] = new DefaultDecoder(), + valueDecoder: Decoder[V] = new DefaultDecoder()) = { + val wildcardStreamsHandler = new WildcardStreamsHandler[K, V](topicFilter, numStreams, keyDecoder, valueDecoder) wildcardStreamsHandler.streams } @@ -191,7 +192,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, wildcardTopicWatcher.shutdown() try { if (config.autoCommitEnable) - scheduler.shutdown() + scheduler.shutdown() fetcher match { case Some(f) => f.stopConnections case None => @@ -214,8 +215,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) - : Map[String,List[KafkaStream[K,V]]] = { + def consume[K, V](topicCountMap: scala.collection.Map[String, Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) + : Map[String, List[KafkaStream[K, V]]] = { debug("entering consume ") if (topicCountMap == null) throw new RuntimeException("topicCountMap is null") @@ -227,8 +228,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // make a list of (queue,stream) pairs, one pair for each threadId val queuesAndStreams = topicThreadIds.values.map(threadIdSet => threadIdSet.map(_ => { - val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) - val stream = new KafkaStream[K,V]( + val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) + val stream = new KafkaStream[K, V]( queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) (queue, stream) }) @@ -238,7 +239,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, registerConsumerInZK(dirs, consumerIdString, topicCount) reinitializeConsumer(topicCount, queuesAndStreams) - loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]] + loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K, V]]]] } // this API is used by unit tests only @@ -248,10 +249,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, info("begin registering consumer " + consumerIdString + " in ZK") val timestamp = SystemTime.milliseconds.toString val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern, - "timestamp" -> timestamp)) + "timestamp" -> timestamp)) createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, - (consumerZKString, consumer) => true, config.zkSessionTimeoutMs) + (consumerZKString, consumer) => true, config.zkSessionTimeoutMs) info("end registering consumer " + consumerIdString + " in ZK") } @@ -271,7 +272,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } catch { case t: Throwable => - // log it and let it go + // log it and let it go error("exception during autoCommit: ", t) } } @@ -290,77 +291,34 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, var done = false while (!done) { - val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors + val committed = offsetsChannelLock synchronized { + // committed when we receive either no error codes or only MetadataTooLarge errors val offsetsToCommit = immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => partitionTopicInfos.map { case (partition, info) => TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset()) } - }.toSeq:_*) - - if (offsetsToCommit.size > 0) { - if (config.offsetsStorage == "zookeeper") { - offsetsToCommit.foreach { case(topicAndPartition, offsetAndMetadata) => - commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset) - } - true - } else { - val offsetCommitRequest = OffsetCommitRequest(config.groupId, offsetsToCommit, clientId = config.clientId) - ensureOffsetManagerConnected() - try { - kafkaCommitMeter.mark(offsetsToCommit.size) - offsetsChannel.send(offsetCommitRequest) - val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().buffer) - trace("Offset commit response: %s.".format(offsetCommitResponse)) - - val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { - offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case(folded, (topicPartition, errorCode)) => - - if (errorCode == ErrorMapping.NoError && config.dualCommitEnabled) { - val offset = offsetsToCommit(topicPartition).offset - commitOffsetToZooKeeper(topicPartition, offset) - } - - (folded._1 || // update commitFailed - errorCode != ErrorMapping.NoError, - - folded._2 || // update retryableIfFailed - (only metadata too large is not retryable) - (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode), - - folded._3 || // update shouldRefreshCoordinator - errorCode == ErrorMapping.NotCoordinatorForConsumerCode || - errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode, + }.toSeq: _*) - // update error count - folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0)) - } - } - debug(errorCount + " errors in offset commit response.") - - - if (shouldRefreshCoordinator) { - debug("Could not commit offsets (because offset coordinator has moved or is unavailable).") - offsetsChannel.disconnect() - } + if (offsetsToCommit.size <= 0) { + debug("No updates to offsets since last commit.") + true + } - if (commitFailed && retryableIfFailed) - false - else - true - } - catch { - case t: Throwable => - error("Error while committing offsets.", t) - offsetsChannel.disconnect() - false - } + if (config.offsetsStorage == "zookeeper") { + offsetsToCommit.foreach { case (topicAndPartition, offsetAndMetadata) => + commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset) + checkpointedZkOffsets.put(topicAndPartition, offsetAndMetadata.offset) } - } else { - debug("No updates to offsets since last commit.") true } + else { + kafkaCommitMeter.mark(offsetsToCommit.size) + OffsetClient.commit(config, ZkUtils.getAllBrokersInCluster(zkClient), offsetsToCommit, config.groupId) + } } - done = if (isShuttingDown.get() && isAutoCommit) { // should not retry indefinitely if shutting down + done = if (isShuttingDown.get() && isAutoCommit) { + // should not retry indefinitely if shutting down retriesRemaining -= 1 retriesRemaining == 0 || committed } else @@ -368,7 +326,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (!done) { debug("Retrying offset commit in %d ms".format(config.offsetsChannelBackoffMs)) - Thread.sleep(config.offsetsChannelBackoffMs) + TimeUnit.MILLISECONDS.sleep(config.offsetsChannelBackoffMs) } } } @@ -387,73 +345,47 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, Some(OffsetFetchResponse(Map.empty)) else if (config.offsetsStorage == "zookeeper") { val offsets = partitions.map(fetchOffsetFromZooKeeper) - Some(OffsetFetchResponse(immutable.Map(offsets:_*))) + Some(OffsetFetchResponse(immutable.Map(offsets: _*))) } else { val offsetFetchRequest = OffsetFetchRequest(groupId = config.groupId, requestInfo = partitions, clientId = config.clientId) var offsetFetchResponseOpt: Option[OffsetFetchResponse] = None + while (!isShuttingDown.get && !offsetFetchResponseOpt.isDefined) { offsetFetchResponseOpt = offsetsChannelLock synchronized { - ensureOffsetManagerConnected() try { - offsetsChannel.send(offsetFetchRequest) - val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().buffer) - trace("Offset fetch response: %s.".format(offsetFetchResponse)) - - val (leaderChanged, loadInProgress) = - offsetFetchResponse.requestInfo.foldLeft(false, false) { case(folded, (topicPartition, offsetMetadataAndError)) => - (folded._1 || (offsetMetadataAndError.error == ErrorMapping.NotCoordinatorForConsumerCode), - folded._2 || (offsetMetadataAndError.error == ErrorMapping.OffsetsLoadInProgressCode)) - } - - if (leaderChanged) { - offsetsChannel.disconnect() - debug("Could not fetch offsets (because offset manager has moved).") - None // retry - } - else if (loadInProgress) { - debug("Could not fetch offsets (because offset cache is being loaded).") - None // retry - } - else { - if (config.dualCommitEnabled) { - // if dual-commit is enabled (i.e., if a consumer group is migrating offsets to kafka), then pick the - // maximum between offsets in zookeeper and kafka. - val kafkaOffsets = offsetFetchResponse.requestInfo - val mostRecentOffsets = kafkaOffsets.map { case (topicPartition, kafkaOffset) => - val zkOffset = fetchOffsetFromZooKeeper(topicPartition)._2.offset - val mostRecentOffset = zkOffset.max(kafkaOffset.offset) - (topicPartition, OffsetMetadataAndError(mostRecentOffset, kafkaOffset.metadata, ErrorMapping.NoError)) - } - Some(OffsetFetchResponse(mostRecentOffsets)) + var offsetFetchResponse = OffsetClient.fetch(config, ZkUtils.getAllBrokersInCluster(zkClient), collection.Seq(), config.groupId).get + + if (config.dualCommitEnabled) { + // if dual-commit is enabled (i.e., if a consumer group is migrating offsets to kafka), then pick the + // maximum between offsets in zookeeper and kafka. + val kafkaOffsets = offsetFetchResponse.requestInfo + val mostRecentOffsets = kafkaOffsets.map { case (topicPartition, kafkaOffset) => + val zkOffset = fetchOffsetFromZooKeeper(topicPartition)._2.offset + val mostRecentOffset = zkOffset.max(kafkaOffset.offset) + (topicPartition, OffsetMetadataAndError(mostRecentOffset, kafkaOffset.metadata, ErrorMapping.NoError)) } - else - Some(offsetFetchResponse) + Some(OffsetFetchResponse(mostRecentOffsets)) } + else + Some(offsetFetchResponse) } catch { case e: Exception => warn("Error while fetching offsets from %s:%d. Possible cause: %s".format(offsetsChannel.host, offsetsChannel.port, e.getMessage)) - offsetsChannel.disconnect() None // retry } } - - if (offsetFetchResponseOpt.isEmpty) { - debug("Retrying offset fetch in %d ms".format(config.offsetsChannelBackoffMs)) - Thread.sleep(config.offsetsChannelBackoffMs) - } } offsetFetchResponseOpt } } - class ZKSessionExpireListener(val dirs: ZKGroupDirs, - val consumerIdString: String, - val topicCount: TopicCount, - val loadBalancerListener: ZKRebalancerListener) + val consumerIdString: String, + val topicCount: TopicCount, + val loadBalancerListener: ZKRebalancerListener) extends IZkStateListener { @throws(classOf[Exception]) def handleStateChanged(state: KeeperState) { @@ -465,14 +397,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, * any ephemeral nodes here. * * @throws Exception - * On any error. + * On any error. */ @throws(classOf[Exception]) def handleNewSession() { /** - * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a - * connection for us. We need to release the ownership of the current consumer and re-register this - * consumer in the consumer registry and trigger a rebalance. + * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a + * connection for us. We need to release the ownership of the current consumer and re-register this + * consumer in the consumer registry and trigger a rebalance. */ info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString) loadBalancerListener.resetState() @@ -488,7 +420,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener) extends IZkDataListener { - def handleDataChange(dataPath : String, data: Object) { + def handleDataChange(dataPath: String, data: Object) { try { info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering rebalance") // queue up the rebalance event @@ -496,19 +428,19 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // There is no need to re-subscribe the watcher since it will be automatically // re-registered upon firing of this event by zkClient } catch { - case e: Throwable => error("Error while handling topic partition change for data path " + dataPath, e ) + case e: Throwable => error("Error while handling topic partition change for data path " + dataPath, e) } } @throws(classOf[Exception]) - def handleDataDeleted(dataPath : String) { + def handleDataDeleted(dataPath: String) { // TODO: This need to be implemented when we support delete topic warn("Topic for path " + dataPath + " gets deleted, which should not happen at this time") } } class ZKRebalancerListener(val group: String, val consumerIdString: String, - val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) + val kafkaMessageAndMetadataStreams: mutable.Map[String, List[KafkaStream[_, _]]]) extends IZkChildListener { private val partitionAssignor = PartitionAssignor.createInstance(config.partitionAssignmentStrategy) @@ -516,7 +448,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private var isWatcherTriggered = false private val lock = new ReentrantLock private val cond = lock.newCondition() - + @volatile private var allTopicsOwnedPartitionsCount = 0 newGauge(config.clientId + "-" + config.groupId + "-AllTopicsOwnedPartitionsCount", new Gauge[Int] { def value() = allTopicsOwnedPartitionsCount @@ -552,7 +484,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, watcherExecutorThread.start() @throws(classOf[Exception]) - def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { + def handleChildChange(parentPath: String, curChilds: java.util.List[String]) { rebalanceEventTriggered() } @@ -570,10 +502,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, debug("Consumer " + consumerIdString + " releasing " + znode) } - private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]])= { + private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]]) = { info("Releasing partition ownership") for ((topic, infos) <- localTopicRegistry) { - for(partition <- infos.keys) { + for (partition <- infos.keys) { deletePartitionOwnershipFromZK(topic, partition) } removeMetric(ownedPartitionsCountMetricName(topic)) @@ -589,7 +521,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def syncedRebalance() { rebalanceLock synchronized { rebalanceTimer.time { - if(isShuttingDown.get()) { + if (isShuttingDown.get()) { return } else { for (i <- 0 until config.rebalanceMaxRetries) { @@ -601,10 +533,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, done = rebalance(cluster) } catch { case e: Throwable => + /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. * For example, a ZK node can disappear between the time we get all children and the time we try to get * the value of a child. Just let this go since another rebalance will be triggered. - **/ + * */ info("exception during rebalance ", e) } info("end rebalancing consumer " + consumerIdString + " try #" + i) @@ -623,7 +556,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries") + throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries + " retries") } private def rebalance(cluster: Cluster): Boolean = { @@ -674,11 +607,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt * A rebalancing attempt is completed successfully only after the fetchers have been started correctly */ - if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) { + if (reflectPartitionOwnershipDecision(partitionOwnershipDecision)) { allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size - partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic } - .foreach { case (topic, partitionThreadPairs) => + partitionOwnershipDecision.view.groupBy { case (topicPartition, consumerThreadId) => topicPartition.topic} + .foreach { case (topic, partitionThreadPairs) => newGauge(ownedPartitionsCountMetricName(topic), new Gauge[Int] { def value() = partitionThreadPairs.size }) @@ -695,7 +628,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } private def closeFetchersForQueues(cluster: Cluster, - messageStreams: Map[String,List[KafkaStream[_,_]]], + messageStreams: Map[String, List[KafkaStream[_, _]]], queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) { val allPartitionInfos = topicRegistry.values.map(p => p.values).flatten fetcher match { @@ -703,37 +636,38 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, f.stopConnections clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams) info("Committing all offsets after clearing the fetcher queues") + /** - * here, we need to commit offsets before stopping the consumer from returning any more messages - * from the current data chunk. Since partition ownership is not yet released, this commit offsets - * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition - * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated - * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes - * successfully and the fetchers restart to fetch more data chunks - **/ - if (config.autoCommitEnable) - commitOffsets() + * here, we need to commit offsets before stopping the consumer from returning any more messages + * from the current data chunk. Since partition ownership is not yet released, this commit offsets + * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition + * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated + * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes + * successfully and the fetchers restart to fetch more data chunks + **/ + if (config.autoCommitEnable) + commitOffsets() case None => } } private def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster, queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]], - messageStreams: Map[String,List[KafkaStream[_,_]]]) { + messageStreams: Map[String, List[KafkaStream[_, _]]]) { // Clear all but the currently iterated upon chunk in the consumer thread's queue queuesTobeCleared.foreach(_.clear) info("Cleared all relevant queues for this fetcher") // Also clear the currently iterated upon chunk in the consumer threads - if(messageStreams != null) - messageStreams.foreach(_._2.foreach(s => s.clear())) + if (messageStreams != null) + messageStreams.foreach(_._2.foreach(s => s.clear())) info("Cleared the data chunks in all the consumer message iterators") } - private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_,_]]], + private def closeFetchers(cluster: Cluster, messageStreams: Map[String, List[KafkaStream[_, _]]], relevantTopicThreadIdsMap: Map[String, Set[ConsumerThreadId]]) { // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer // after this rebalancing attempt @@ -743,12 +677,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def updateFetcher(cluster: Cluster) { // update partitions for fetcher - var allPartitionInfos : List[PartitionTopicInfo] = Nil + var allPartitionInfos: List[PartitionTopicInfo] = Nil for (partitionInfos <- topicRegistry.values) for (partition <- partitionInfos.values) allPartitionInfos ::= partition info("Consumer " + consumerIdString + " selected partitions : " + - allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(",")) + allPartitionInfos.sortWith((s, t) => s.partitionId < t.partitionId).map(_.toString).mkString(",")) fetcher match { case Some(f) => @@ -758,7 +692,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[TopicAndPartition, ConsumerThreadId]): Boolean = { - var successfullyOwnedPartitions : List[(String, Int)] = Nil + var successfullyOwnedPartitions: List[(String, Int)] = Nil val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner => val topic = partitionOwner._1.topic val partition = partitionOwner._1.partition @@ -767,7 +701,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, try { createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId.toString) info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic) - successfullyOwnedPartitions ::= (topic, partition) + successfullyOwnedPartitions ::=(topic, partition) true } catch { case e: ZkNodeExistsException => @@ -777,9 +711,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, case e2: Throwable => throw e2 } } - val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1)) + val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if (decision) 0 else 1)) /* even if one of the partition ownership attempt has failed, return false */ - if(hasPartitionOwnershipFailed > 0) { + if (hasPartitionOwnershipFailed > 0) { // remove all paths that we have owned in ZK successfullyOwnedPartitions.foreach(topicAndPartition => deletePartitionOwnershipFromZK(topicAndPartition._1, topicAndPartition._2)) false @@ -796,29 +730,29 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val consumedOffset = new AtomicLong(offset) val fetchedOffset = new AtomicLong(offset) val partTopicInfo = new PartitionTopicInfo(topic, - partition, - queue, - consumedOffset, - fetchedOffset, - new AtomicInteger(config.fetchMessageMaxBytes), - config.clientId) + partition, + queue, + consumedOffset, + fetchedOffset, + new AtomicInteger(config.fetchMessageMaxBytes), + config.clientId) partTopicInfoMap.put(partition, partTopicInfo) debug(partTopicInfo + " selected new offset " + offset) checkpointedZkOffsets.put(TopicAndPartition(topic, partition), offset) } } - private def reinitializeConsumer[K,V]( - topicCount: TopicCount, - queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) { + private def reinitializeConsumer[K, V]( + topicCount: TopicCount, + queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk], KafkaStream[K, V])]) { val dirs = new ZKGroupDirs(config.groupId) // listener to consumer and partition changes if (loadBalancerListener == null) { - val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]] + val topicStreamsMap = new mutable.HashMap[String, List[KafkaStream[K, V]]] loadBalancerListener = new ZKRebalancerListener( - config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) + config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_, _]]]]) } // create listener for session expired event if not exist yet @@ -848,13 +782,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } val topicThreadIds = consumerThreadIdsPerTopic.map { - case(topic, threadIds) => + case (topic, threadIds) => threadIds.map((topic, _)) }.flatten require(topicThreadIds.size == allQueuesAndStreams.size, "Mismatch between thread ID count (%d) and queue count (%d)" - .format(topicThreadIds.size, allQueuesAndStreams.size)) + .format(topicThreadIds.size, allQueuesAndStreams.size)) val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams) threadQueueStreamPairs.foreach(e => { @@ -893,11 +827,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, loadBalancerListener.syncedRebalance() } - class WildcardStreamsHandler[K,V](topicFilter: TopicFilter, - numStreams: Int, - keyDecoder: Decoder[K], - valueDecoder: Decoder[V]) - extends TopicEventHandler[String] { + class WildcardStreamsHandler[K, V](topicFilter: TopicFilter, + numStreams: Int, + keyDecoder: Decoder[K], + valueDecoder: Decoder[V]) + extends TopicEventHandler[String] { if (messageStreamCreated.getAndSet(true)) throw new RuntimeException("Each consumer connector can create " + @@ -905,16 +839,16 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val wildcardQueuesAndStreams = (1 to numStreams) .map(e => { - val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) - val stream = new KafkaStream[K,V](queue, - config.consumerTimeoutMs, - keyDecoder, - valueDecoder, - config.clientId) - (queue, stream) + val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) + val stream = new KafkaStream[K, V](queue, + config.consumerTimeoutMs, + keyDecoder, + valueDecoder, + config.clientId) + (queue, stream) }).toList - // bootstrap with existing topics + // bootstrap with existing topics private var wildcardTopics = getChildrenParentMayNotExist(zkClient, BrokerTopicsPath) .filter(topic => topicFilter.isTopicAllowed(topic, config.excludeInternalTopics)) @@ -940,7 +874,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val addedTopics = updatedTopics filterNot (wildcardTopics contains) if (addedTopics.nonEmpty) info("Topic event: added topics = %s" - .format(addedTopics)) + .format(addedTopics)) /* * TODO: Deleted topics are interesting (and will not be a concern until @@ -950,7 +884,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val deletedTopics = wildcardTopics filterNot (updatedTopics contains) if (deletedTopics.nonEmpty) info("Topic event: deleted topics = %s" - .format(deletedTopics)) + .format(deletedTopics)) wildcardTopics = updatedTopics info("Topics to consume = %s".format(wildcardTopics)) @@ -959,7 +893,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams) } - def streams: Seq[KafkaStream[K,V]] = + def streams: Seq[KafkaStream[K, V]] = wildcardQueuesAndStreams.map(_._2) } + } diff --git a/core/src/main/scala/kafka/tools/ConfigConstants.scala b/core/src/main/scala/kafka/tools/ConfigConstants.scala new file mode 100644 index 0000000..b4ed9dc --- /dev/null +++ b/core/src/main/scala/kafka/tools/ConfigConstants.scala @@ -0,0 +1,11 @@ +package kafka.tools + +/** + * Created by mgharat on 9/17/14. + */ +object ConfigConstants { + val offsetsChannelBackoffMS = 10000 + val offsetsCommitMaxRetries = 5 + val clientId = "OffsetToolClient" + val socketTimeoutMS = 10000 +} diff --git a/core/src/main/scala/kafka/tools/ExportOffsets.scala b/core/src/main/scala/kafka/tools/ExportOffsets.scala new file mode 100644 index 0000000..3cf3824 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ExportOffsets.scala @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io.{BufferedWriter, File, FileWriter} +import joptsimple._ +import kafka.api.OffsetFetchResponse +import kafka.cluster.Broker +import kafka.consumer.ConsumerConfig +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient + + +/** + * A utility that retrieve the offset of broker partitions in ZK and + * prints to an output file in the following format: + * + * /consumers/group1/offsets/topic1/1-0:286894308 + * /consumers/group1/offsets/topic1/2-0:284803985 + * + * This utility expects 3 arguments: + * 1. Zk host:port string + * 2. group name (all groups implied if omitted) + * 3. output filename + * + * To print debug message, add the following line to log4j.properties: + * log4j.logger.kafka.tools.ExportZkOffsets$=DEBUG + * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) + */ +object ExportOffsets extends Logging { + + def main(args: Array[String]) { + val parser = new OptionParser + + val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") + .withRequiredArg() + .defaultsTo("localhost:2181") + .ofType(classOf[String]) + + val groupOpt = parser.accepts("group", "Consumer group.") + .withRequiredArg() + .ofType(classOf[String]) + + val outFileOpt = parser.accepts("output-file", "Output file") + .withRequiredArg() + .ofType(classOf[String]) + + val brokerList = parser.accepts("broker-list", "List of brokers") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetClientConfigFile = parser.accepts("config-file", "Consumer config file") + .withRequiredArg() + .ofType(classOf[String]) + + val topicList = parser.accepts("topics", "Topics for which offsets/Partitions should be fetched") + .withRequiredArg() + .ofType(classOf[String]) + + parser.accepts("help", "Print this message.") + + if (args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Export consumer offsets to an output file.") + + val options = parser.parse(args: _*) + + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } + + var brokers = collection.Seq[Broker]() + val allBrokers = options.valueOf(brokerList) + + if (allBrokers != null) { + val hostAndPort = allBrokers.split(",") + brokers = hostAndPort.map(hostPort => { + val temp = hostPort.split(":") + val host = temp(0) + val port = temp(1).toInt + new Broker(Math.random().toInt, host, port) + }) + } + + var topics = collection.Seq[String]() + if (topicList != null) { + topics = options.valueOf(topicList).split(",").toSeq + } + + var offsetClientConfig = options.valueOf(offsetClientConfigFile) + var config: ConsumerConfig = null + if (offsetClientConfig != null) { + config = new ConsumerConfig((Utils.loadProps(offsetClientConfig))) + } + + val outfile = options.valueOf(outFileOpt) + + val groupId = options.valueOf(groupOpt) + + // checking whether to use zookeeper or offsetManager + if (allBrokers != null) { + val file = new File(outfile) + var offSetFetchResponseOpt: Option[OffsetFetchResponse] = None + + while (!offSetFetchResponseOpt.isDefined) { + offSetFetchResponseOpt = OffsetClient.fetch(config, brokers, topics, groupId) + } + val offsetFetchResponse = offSetFetchResponseOpt.get + offsetFetchResponse.requestInfo.foreach(x => println((x._1).topic + " : " + (x._1).partition + ": " + (x._2).offset)) + val topicPartitionOffsetMap = offsetFetchResponse.requestInfo + + // writing to file + val writer = new BufferedWriter(new FileWriter(file)) + topicPartitionOffsetMap.foreach(tpo => { + val topic = tpo._1.topic + val partition = tpo._1.partition + val offset = tpo._2.offset + writer.write(topic + "/" + partition + "/" + offset) + writer.newLine() + }) + writer.flush() + writer.close() + } + else { + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, outFileOpt) + + val zkConnect = options.valueOf(zkConnectOpt) + val groups = options.valuesOf(groupOpt) + + var zkClient: ZkClient = null + val fileWriter: FileWriter = new FileWriter(outfile) + + try { + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + + var consumerGroups: Seq[String] = null + + if (groups.size == 0) { + consumerGroups = ZkUtils.getChildren(zkClient, ZkUtils.ConsumersPath).toList + } + else { + import scala.collection.JavaConversions._ + consumerGroups = groups + } + + for (consumerGrp <- consumerGroups) { + val topicsList = getTopicsList(zkClient, consumerGrp) + + for (topic <- topicsList) { + val bidPidList = getBrokeridPartition(zkClient, consumerGrp, topic) + + for (bidPid <- bidPidList) { + val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp, topic) + val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid + ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1 match { + case Some(offsetVal) => + fileWriter.write(offsetPath + ":" + offsetVal + "\n") + debug(offsetPath + " => " + offsetVal) + case None => + error("Could not retrieve offset value from " + offsetPath) + } + } + } + } + } + finally { + fileWriter.flush() + fileWriter.close() + } + } + } + + private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] = { + return ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList + } + + private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] = { + return ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList + } +} diff --git a/core/src/main/scala/kafka/tools/ImportOffsets.scala b/core/src/main/scala/kafka/tools/ImportOffsets.scala new file mode 100644 index 0000000..e7bdbd3 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ImportOffsets.scala @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io.BufferedReader +import java.io.FileReader +import java.util.concurrent.TimeUnit +import joptsimple._ +import kafka.cluster.Broker +import kafka.common.{OffsetAndMetadata, TopicAndPartition} +import kafka.consumer.ConsumerConfig +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient + +import scala.io.Source + + +/** + * A utility that updates the offset of broker partitions in ZK. + * + * This utility expects 2 input files as arguments: + * 1. consumer properties file + * 2. a file contains partition offsets data such as: + * (This output data file can be obtained by running kafka.tools.ExportZkOffsets) + * + * /consumers/group1/offsets/topic1/3-0:285038193 + * /consumers/group1/offsets/topic1/1-0:286894308 + * + * To print debug message, add the following line to log4j.properties: + * log4j.logger.kafka.tools.ImportZkOffsets$=DEBUG + * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) + */ +object ImportOffsets extends Logging { + + def main(args: Array[String]) { + val parser = new OptionParser + + val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") + .withRequiredArg() + .defaultsTo("localhost:2181") + .ofType(classOf[String]) + + val inFileOpt = parser.accepts("input-file", "Input file") + .withRequiredArg() + .ofType(classOf[String]) + + val brokerList = parser.accepts("broker-list", "List of brokers") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetClientConfigFile = parser.accepts("config-file", "Consumer config file") + .withRequiredArg() + .ofType(classOf[String]) + + val group = parser.accepts("group", "Consumer Group Id") + .withRequiredArg() + .ofType(classOf[String]) + + parser.accepts("help", "Print this message.") + + if (args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Import offsets to zookeeper from files.") + + val options = parser.parse(args: _*) + + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } + + var brokers = collection.Seq[Broker]() + val allBrokers = options.valueOf(brokerList) + + if (allBrokers != null) { + val hostAndPort = options.valueOf(brokerList).split(",") + brokers = hostAndPort.map(hostPort => { + val temp = hostPort.split(":") + val host = temp(0) + val port = temp(1).toInt + new Broker(Math.random().toInt, host, port) + }) + } + + var offsetClientConfig = options.valueOf(offsetClientConfigFile) + var config: ConsumerConfig = null + if (offsetClientConfig != null) { + config = new ConsumerConfig((Utils.loadProps(offsetClientConfig))) + } + + val partitionOffsetFile = options.valueOf(inFileOpt) + + var groupId: String = options.valueOf(group) + + // checking whether to use zookeeper or offsetManager + if (allBrokers != null) { + val fileLines = Source.fromFile(partitionOffsetFile).getLines().toList + val offsetsToCommit = fileLines.map(line => { + val topicPartitionOffset = line.split("/") + val topicAndPartition = new TopicAndPartition(topicPartitionOffset(0), topicPartitionOffset(1).toInt) + val offsetMetadata = new OffsetAndMetadata(topicPartitionOffset(2).toInt) + println(topicAndPartition + "->" + offsetMetadata) + (topicAndPartition -> offsetMetadata) + }).toMap + + var done = false + var offsetsCommitRetries = 0 + var offsetsChannelBackoffMS = 0 + + if (config != null) { + offsetsCommitRetries = config.offsetsCommitMaxRetries + offsetsChannelBackoffMS = config.offsetsChannelBackoffMs + } + else { + offsetsCommitRetries = ConfigConstants.offsetsCommitMaxRetries + offsetsChannelBackoffMS = ConfigConstants.offsetsChannelBackoffMS + } + + var retriesRemaining = 1 + offsetsCommitRetries + + while (done != true || retriesRemaining == 0) { + done = OffsetClient.commit(config, brokers, offsetsToCommit, groupId) + retriesRemaining = retriesRemaining - 1 + + if (!done) { + debug("Retrying offset commit in %d ms".format(offsetsChannelBackoffMS)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMS) + } + } + + } + else { + CommandLineUtils.checkRequiredArgs(parser, options, inFileOpt) + + val zkConnect = options.valueOf(zkConnectOpt) + + val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + val partitionOffsets: Map[String, String] = getPartitionOffsetsFromFile(partitionOffsetFile) + + updateZkOffsets(zkClient, partitionOffsets) + } + } + + private def getPartitionOffsetsFromFile(filename: String): Map[String, String] = { + val fr = new FileReader(filename) + val br = new BufferedReader(fr) + var partOffsetsMap: Map[String, String] = Map() + + var s: String = br.readLine() + while (s != null && s.length() >= 1) { + val tokens = s.split(":") + + partOffsetsMap += tokens(0) -> tokens(1) + debug("adding node path [" + s + "]") + + s = br.readLine() + } + + return partOffsetsMap + } + + private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String, String]): Unit = { + for ((partition, offset) <- partitionOffsets) { + debug("updating [" + partition + "] with offset [" + offset + "]") + + try { + ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) + } catch { + case e: Throwable => e.printStackTrace() + } + } + } +} diff --git a/core/src/main/scala/kafka/tools/OffsetClient.scala b/core/src/main/scala/kafka/tools/OffsetClient.scala index c62a31c..af4a7a8 100644 --- a/core/src/main/scala/kafka/tools/OffsetClient.scala +++ b/core/src/main/scala/kafka/tools/OffsetClient.scala @@ -20,19 +20,25 @@ import scala.util.Random object OffsetClient extends Logging { private def getQueryChannel(config: ConsumerConfig, brokers: Seq[Broker]): BlockingChannel = { + var socketTimeoutMS = 0 + if (config != null) { + socketTimeoutMS = config.socketTimeoutMs + } + else { + socketTimeoutMS = ConfigConstants.socketTimeoutMS + } + var channel: BlockingChannel = null var connected = false - // val shuffledBrokers = Random.shuffle(brokers) + val shuffledBrokers = Random.shuffle(brokers) var i: Int = 0 - while ( /*i < shuffledBrokers.size &&*/ !connected) { - // val broker = shuffledBrokers(i) - // i = i + 1 - val shuffledBrokers = Random.shuffle(brokers) - val broker = shuffledBrokers(0) + while (i < shuffledBrokers.size && !connected) { + val broker = shuffledBrokers(i) + i = i + 1 trace("Connecting to the broker %s:%d.".format(broker.host, broker.port)) try { - channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, config.socketTimeoutMs) + channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, socketTimeoutMS) debug("Created channel to broker %s:%d.".format(channel.host, channel.port)) true } @@ -50,10 +56,21 @@ object OffsetClient extends Logging { } - private def getOffsetManagerChannel(config: ConsumerConfig, /*brokerMap: collection.Map[Int, Broker]*/ brokers: Seq[Broker]): Option[BlockingChannel] = { + private def getOffsetManagerChannel(config: ConsumerConfig, brokers: Seq[Broker], group: String): Option[BlockingChannel] = { + var offsetsChannelBackoffMS = 0 + var socketTimeoutMS = 0 + if (config != null) { + offsetsChannelBackoffMS = config.offsetsChannelBackoffMs + socketTimeoutMS = config.socketTimeoutMs + } + else { + offsetsChannelBackoffMS = ConfigConstants.offsetsChannelBackoffMS + socketTimeoutMS = ConfigConstants.socketTimeoutMS + } + var offSetManagerChannel: BlockingChannel = null - var queryChannel = getQueryChannel(config, /*brokerMap.values.toSeq*/ brokers) + var queryChannel = getQueryChannel(config, brokers) var offSetManagerChannelOpt: Option[BlockingChannel] = None @@ -64,10 +81,10 @@ object OffsetClient extends Logging { while (!coordinatorOpt.isDefined) { try { if (!queryChannel.isConnected) { - queryChannel = getQueryChannel(config, /*brokerMap.values.toSeq*/ brokers) + queryChannel = getQueryChannel(config, brokers) } - debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, config.groupId)) - queryChannel.send(ConsumerMetadataRequest(config.groupId)) + debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group)) + queryChannel.send(ConsumerMetadataRequest(group)) val response = queryChannel.receive() val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) debug("Consumer metadata response: " + consumerMetadataResponse.toString) @@ -75,9 +92,9 @@ object OffsetClient extends Logging { coordinatorOpt = consumerMetadataResponse.coordinatorOpt } else { - debug("Query to %s:%d to locate offset manager for %s failed - will retry in % milliseconds." - .format(queryChannel.host, queryChannel.port, config.groupId, config.offsetsChannelBackoffMs)) - Thread.sleep(config.offsetsChannelBackoffMs) + debug("Query to %s:%d to locate offset manager for %s failed - will retry in %d milliseconds." + .format(queryChannel.host, queryChannel.port, group, offsetsChannelBackoffMS)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMS) } } catch { @@ -95,7 +112,7 @@ object OffsetClient extends Logging { val connectString = "%s:%d".format(coordinator.host, coordinator.port) try { offSetManagerChannel = new BlockingChannel(coordinator.host, coordinator.port, BlockingChannel.UseDefaultBufferSize, - BlockingChannel.UseDefaultBufferSize, config.socketTimeoutMs) + BlockingChannel.UseDefaultBufferSize, socketTimeoutMS) offSetManagerChannel.connect() offSetManagerChannelOpt = Some(offSetManagerChannel) queryChannel.disconnect() @@ -104,7 +121,7 @@ object OffsetClient extends Logging { case ioe: IOException => info("Error while connecting to %s.".format(connectString)) if (offSetManagerChannel != null) offSetManagerChannel.disconnect() - TimeUnit.MILLISECONDS.sleep(config.offsetsChannelBackoffMs) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMS) offSetManagerChannelOpt = None } } @@ -126,7 +143,7 @@ object OffsetClient extends Logging { topicMetadataChannel = getQueryChannel(config, brokers) } info("Fetching metadata from broker %s:%d".format(topicMetadataChannel.host, topicMetadataChannel.port)) - val topicMetaDataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics) + val topicMetaDataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, ConfigConstants.clientId, topics) topicMetadataChannel.send(topicMetaDataRequest) topicMetaDataResponse = TopicMetadataResponse.readFrom(topicMetadataChannel.receive().buffer) fetchMetadataSucceded = true @@ -142,11 +159,19 @@ object OffsetClient extends Logging { } - def fetch(config: ConsumerConfig, /*brokerConfig: Seq[BrokerConfig]*/ brokers: Seq[Broker], topics: Seq[String]): Option[OffsetFetchResponse] = { - // val brokers = brokerConfig.map(config => new Broker(config.brokerId, config.hostName, config.port)) + def fetch(config: ConsumerConfig = null, brokers: Seq[Broker], topics: Seq[String] = null, group: String): Option[OffsetFetchResponse] = { var topicAndPartition = collection.Seq[TopicAndPartition]() - if (topics.size > 0) { + var offsetsChannelBackoffMS = 0 + + if (config != null) { + offsetsChannelBackoffMS = config.offsetsChannelBackoffMs + } + else { + offsetsChannelBackoffMS = ConfigConstants.offsetsChannelBackoffMS + } + + if (topics != null && topics.size > 0) { val topicMetaDataResponse: TopicMetadataResponse = fetchTopicMetadata(config, brokers, topics) topicAndPartition = topicMetaDataResponse.topicsMetadata.flatMap(topicMetaData => { val topic = topicMetaData.topic @@ -159,10 +184,10 @@ object OffsetClient extends Logging { ) } - val offsetFetchRequest = OffsetFetchRequest(groupId = config.groupId, requestInfo = topicAndPartition, clientId = config.clientId) + val offsetFetchRequest = OffsetFetchRequest(groupId = group, requestInfo = topicAndPartition, clientId = ConfigConstants.clientId) var offsetFetchResponseOpt: Option[OffsetFetchResponse] = None while (!offsetFetchResponseOpt.isDefined) { - val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers).get + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers, group).get try { offsetManagerChannel.send(offsetFetchRequest) val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetManagerChannel.receive().buffer) @@ -191,76 +216,65 @@ object OffsetClient extends Logging { } if (offsetFetchResponseOpt.isEmpty) { - debug("Retrying offset fetch in %d ms".format(config.offsetsChannelBackoffMs)) - TimeUnit.MILLISECONDS.sleep(config.offsetsChannelBackoffMs) + debug("Retrying offset fetch in %d ms".format(offsetsChannelBackoffMS)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMS) } } offsetFetchResponseOpt } - def commit(config: ConsumerConfig, brokers: Seq[Broker], offsetsToCommit: Map[TopicAndPartition, OffsetAndMetadata]): Boolean = { - var done = false - var retriesRemaining = 1 + config.offsetsCommitMaxRetries + def commit(config: ConsumerConfig = null, brokers: Seq[Broker], offsetsToCommit: Map[TopicAndPartition, OffsetAndMetadata], group: String): Boolean = { var committed = false if (offsetsToCommit.size > 0) { - val offsetCommitRequest = OffsetCommitRequest(config.groupId, offsetsToCommit, clientId = config.clientId) - - while (!done) { - val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers).get - try { - offsetManagerChannel.send(offsetCommitRequest) - val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetManagerChannel.receive().buffer) - trace("Offset commit response: %s.".format(offsetCommitResponse)) - - val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { - offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) => - - (folded._1 || //update commitFailed - errorCode != ErrorMapping.NoError, + val offsetCommitRequest = OffsetCommitRequest(group, offsetsToCommit, clientId = ConfigConstants.clientId) + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers, group).get + try { + offsetManagerChannel.send(offsetCommitRequest) + val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetManagerChannel.receive().buffer) + trace("Offset commit response: %s.".format(offsetCommitResponse)) - folded._2 || //update retryableIfFailed - (only metadata too large is not retryable) - (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode), + val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { + offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) => - folded._3 || //update shouldRefreshCoordinator - errorCode == ErrorMapping.NotCoordinatorForConsumerCode || - errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode, + (folded._1 || //update commitFailed + errorCode != ErrorMapping.NoError, - //update error count - folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0)) - } - } - debug(errorCount + " errors in offset commit response.") + folded._2 || //update retryableIfFailed - (only metadata too large is not retryable) + (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode), - if (shouldRefreshCoordinator) { - debug("Could not commit offsets (because offset coordinator has moved or is unavailable).") - offsetManagerChannel.disconnect() - } + folded._3 || //update shouldRefreshCoordinator + errorCode == ErrorMapping.NotCoordinatorForConsumerCode || + errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode, - if (commitFailed && retryableIfFailed) { - committed = false + //update error count + folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0)) } - else { - committed = true - } - } - catch { - case t: Throwable => - error("Error while commiting offsets.", t) - offsetManagerChannel.disconnect() } + debug(errorCount + " errors in offset commit response.") - done = { - retriesRemaining -= 1 - retriesRemaining == 0 || committed + if (shouldRefreshCoordinator) { + debug("Could not commit offsets (because offset coordinator has moved or is unavailable).") + offsetManagerChannel.disconnect() } - if (!done) { - debug("Retrying offset commit in %d ms".format(config.offsetsChannelBackoffMs)) - TimeUnit.MILLISECONDS.sleep(config.offsetsChannelBackoffMs) + if (commitFailed && retryableIfFailed) { + committed = false + } + else { + committed = true } } + catch { + case t: Throwable => + error("Error while commiting offsets.", t) + offsetManagerChannel.disconnect() + } + } + else { + debug("No updates to offsets since last commit.") + true } committed -- 1.9.3 (Apple Git-50)