From 3b15575e888fa318be33d88aaf2784bb06fda553 Mon Sep 17 00:00:00 2001 From: mgharat Date: Sun, 21 Dec 2014 14:39:06 -0800 Subject: [PATCH] Reverted Changes to ZookeeperConsumerConnector.scala --- .../src/main/scala/kafka/tools/ExportOffsets.scala | 243 +++++++++++++++++ .../src/main/scala/kafka/tools/ImportOffsets.scala | 229 ++++++++++++++++ core/src/main/scala/kafka/tools/OffsetClient.scala | 303 +++++++++++++++++++++ .../scala/kafka/tools/OffsetClientConfig.scala | 39 +++ 4 files changed, 814 insertions(+) create mode 100644 core/src/main/scala/kafka/tools/ExportOffsets.scala create mode 100644 core/src/main/scala/kafka/tools/ImportOffsets.scala create mode 100644 core/src/main/scala/kafka/tools/OffsetClient.scala create mode 100644 core/src/main/scala/kafka/tools/OffsetClientConfig.scala diff --git a/core/src/main/scala/kafka/tools/ExportOffsets.scala b/core/src/main/scala/kafka/tools/ExportOffsets.scala new file mode 100644 index 0000000..024a033 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ExportOffsets.scala @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.tools + +import java.io.{BufferedWriter, File, FileWriter} +import joptsimple._ +import kafka.api.{OffsetFetchRequest, TopicMetadataResponse, OffsetFetchResponse} +import kafka.cluster.Broker +import kafka.common.TopicAndPartition +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient + + +/** + * A utility that retrieve the offset of broker partitions in ZK or OffsetManager and + * prints to an output file in the following format: + * + * ZK format + * /consumers/group1/offsets/topic1/1-0:286894308 + * /consumers/group1/offsets/topic1/2-0:284803985 + * + * OffsetManager file format + * topic/partition/offset + * + * This utility expects following arguments: + * For using Zookeeper : + * 1. Zk host:port string + * 2. group name (all groups implied if omitted) + * 3. output filename + * + * For using OffsetManager + * 1. group name + * 2. output filename + * 3. broker list (host:port) + * 4. config file (optional) + * 5. topic list (optional) + * + * To print debug message, add the following line to log4j.properties: + * log4j.logger.kafka.tools.ExportZkOffsets$=DEBUG + * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) + */ +object ExportOffsets extends Logging { + + def main(args: Array[String]) { + val parser = new OptionParser + + val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") + .withRequiredArg() + .defaultsTo("localhost:2181") + .ofType(classOf[String]) + + val groupOpt = parser.accepts("group", "Consumer group.") + .withRequiredArg() + .ofType(classOf[String]) + + val outFileOpt = parser.accepts("output-file", "Output file") + .withRequiredArg() + .ofType(classOf[String]) + + val brokerList = parser.accepts("broker-list", "List of brokers") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsChannelSocketTimeoutMs = parser.accepts("OffsetSocketChannelTimeout", "Socket Time out for the Offset channel") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsChannelBackoffMs = parser.accepts("OffsetChannelBackOffMs", "Time to wait before retrying to connect") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsCommitMaxRetries = parser.accepts("OffsetCommitMaxRetries", "Max number of retries for commiting offsets") + .withRequiredArg() + .ofType(classOf[String]) + + val topicList = parser.accepts("topics", "Topics for which offsets/Partitions should be fetched") + .withRequiredArg() + .ofType(classOf[String]) + + parser.accepts("help", "Print this message.") + + if (args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Export consumer offsets to an output file.") + + val options = parser.parse(args: _*) + + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } + + val allBrokers = options.valueOf(brokerList) + var brokers = Utils.parseBrokerList(allBrokers) + + var topics = collection.Seq[String]() + var topicListValue = options.valueOf(topicList) + if (topicListValue != null) { + topics = topicListValue.split(",").toSeq + } + + var offsetClientConfig: OffsetClientConfig = null + var value : String = null + val socketTimeOut = { + value = options.valueOf(offsetsChannelSocketTimeoutMs) + if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelSocketTimeoutMs + } + + val channelBackOff = { + value = options.valueOf(offsetsChannelBackoffMs) + if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelBackoffMs + } + + val commitRetries = { + value = options.valueOf(offsetsCommitMaxRetries) + if (value != null) value.toInt else OffsetClientConfig.OffsetsCommitMaxRetries + } + + val config = OffsetClientConfig(socketTimeOut, channelBackOff, commitRetries) + + val outfile = options.valueOf(outFileOpt) + + val groupId = options.valueOf(groupOpt) + + // checking whether to use zookeeper or offsetManager + if (allBrokers != null) { + val file = new File(outfile) + var offSetFetchResponseOpt: Option[OffsetFetchResponse] = None + var topicAndPartition = collection.Seq[TopicAndPartition]() + + var offsetsChannelBackoffMs = 0 + if (config != null) { + offsetsChannelBackoffMs = config.offsetsChannelBackoffMs + } + else { + offsetsChannelBackoffMs = OffsetClientConfig.OffsetsChannelBackoffMs + } + + if (topics.size > 0) { + val topicMetaDataResponse: TopicMetadataResponse = OffsetClient.fetchTopicMetadata(config, brokers, topics) + topicAndPartition = topicMetaDataResponse.topicsMetadata.flatMap(topicMetaData => { + val topic = topicMetaData.topic + topicMetaData.partitionsMetadata.map(partitionMetadata => { + new TopicAndPartition(topic, partitionMetadata.partitionId) + } + ) + } + ) + } + + val offsetFetchRequest = OffsetFetchRequest(groupId, requestInfo = topicAndPartition, clientId = config.clientId) + + while (!offSetFetchResponseOpt.isDefined) { + val response = OffsetClient.fetchOffsets(config, brokers, groupId, offsetFetchRequest) + println(response) + offSetFetchResponseOpt = Some(response) + } + val offsetFetchResponse = offSetFetchResponseOpt.get + val topicPartitionOffsetMap = offsetFetchResponse.requestInfo + + // writing to file + val writer = new BufferedWriter(new FileWriter(file)) + topicPartitionOffsetMap.foreach(tpo => { + val topic = tpo._1.topic + val partition = tpo._1.partition + val offset = tpo._2.offset + writer.write(topic + "/" + partition + "/" + offset) + writer.newLine() + }) + writer.flush() + writer.close() + } + else { + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, outFileOpt) + + val zkConnect = options.valueOf(zkConnectOpt) + val groups = options.valuesOf(groupOpt) + + var zkClient: ZkClient = null + val fileWriter: FileWriter = new FileWriter(outfile) + + try { + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + + var consumerGroups: Seq[String] = null + + if (groups.size == 0) { + consumerGroups = ZkUtils.getChildren(zkClient, ZkUtils.ConsumersPath).toList + } + else { + import scala.collection.JavaConversions._ + consumerGroups = groups + } + + for (consumerGrp <- consumerGroups) { + val topicsList = getTopicsList(zkClient, consumerGrp) + + for (topic <- topicsList) { + val bidPidList = getBrokeridPartition(zkClient, consumerGrp, topic) + + for (bidPid <- bidPidList) { + val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp, topic) + val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid + ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1 match { + case Some(offsetVal) => + fileWriter.write(offsetPath + ":" + offsetVal + "\n") + debug(offsetPath + " => " + offsetVal) + case None => + error("Could not retrieve offset value from " + offsetPath) + } + } + } + } + } + finally { + fileWriter.flush() + fileWriter.close() + } + } + } + + private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] = { + return ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList + } + + private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] = { + return ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList + } +} + diff --git a/core/src/main/scala/kafka/tools/ImportOffsets.scala b/core/src/main/scala/kafka/tools/ImportOffsets.scala new file mode 100644 index 0000000..9546644 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ImportOffsets.scala @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io.BufferedReader +import java.io.FileReader +import java.util.concurrent.TimeUnit +import joptsimple._ +import kafka.api.OffsetCommitRequest +import kafka.common.{OffsetAndMetadata, TopicAndPartition} +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient + + +/** + * A utility that updates the offset of broker partitions in ZK or OffsetManager. + * + * This utility expects following arguments: + * + * For using Zookeeper : + * 1. consumer properties file + * 2. a file contains partition offsets data such as: + * (This output data file can be obtained by running kafka.tools.ExportZkOffsets) + * + * For using OffsetManager : + * 1. group name + * 2. input filename (file to read from) + * 3. broker list (host:port) + * 4. config file (optional) + * + * ZK format + * /consumers/group1/offsets/topic1/3-0:285038193 + * /consumers/group1/offsets/topic1/1-0:286894308 + * + * OffsetManager file format + * topic/partition/offset + * + * To print debug message, add the following line to log4j.properties: + * log4j.logger.kafka.tools.ImportZkOffsets$=DEBUG + * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) + */ + + import scala.io.Source + + + + object ImportOffsets extends Logging { + + def main(args: Array[String]) { + val parser = new OptionParser + + val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") + .withRequiredArg() + .defaultsTo("localhost:2181") + .ofType(classOf[String]) + + val inFileOpt = parser.accepts("input-file", "Input file") + .withRequiredArg() + .ofType(classOf[String]) + + val brokerList = parser.accepts("broker-list", "List of brokers") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsChannelSocketTimeoutMs = parser.accepts("OffsetSocketChannelTimeout", "Socket Time out for the Offset channel") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsChannelBackoffMs = parser.accepts("OffsetChannelBackOffMs", "Time to wait before retrying to connect") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsCommitMaxRetries = parser.accepts("OffsetCommitMaxRetries", "Max number of retries for commiting offsets") + .withRequiredArg() + .ofType(classOf[String]) + + val group = parser.accepts("group", "Consumer Group Id") + .withRequiredArg() + .ofType(classOf[String]) + + parser.accepts("help", "Print this message.") + + if (args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Import offsets to zookeeper from files.") + + val options = parser.parse(args: _*) + + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } + + val allBrokers = options.valueOf(brokerList) + var brokers = Utils.parseBrokerList(allBrokers) + + var offsetClientConfig: OffsetClientConfig = null + var value : String = null + val socketTimeOut = { + value = options.valueOf(offsetsChannelSocketTimeoutMs) + if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelSocketTimeoutMs + } + + val channelBackOff = { + value = options.valueOf(offsetsChannelBackoffMs) + if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelBackoffMs + } + + val commitRetries = { + value = options.valueOf(offsetsCommitMaxRetries) + if (value != null) value.toInt else OffsetClientConfig.OffsetsCommitMaxRetries + } + + val config = OffsetClientConfig(socketTimeOut, channelBackOff, commitRetries) + + val partitionOffsetFile = options.valueOf(inFileOpt) + + var groupId: String = options.valueOf(group) + + // checking whether to use zookeeper or offsetManager + if (allBrokers != null) { + val fileLines = Source.fromFile(partitionOffsetFile).getLines().toList + val offsetsToCommit = fileLines.map(line => { + val topicPartitionOffset = line.split("/") + val topicAndPartition = new TopicAndPartition(topicPartitionOffset(0), topicPartitionOffset(1).toInt) + val offsetMetadata = new OffsetAndMetadata(topicPartitionOffset(2).toInt) + println(topicAndPartition + "->" + offsetMetadata) + (topicAndPartition -> offsetMetadata) + }).toMap + + var done = false + var offsetsCommitRetries = 0 + var offsetsChannelBackoffMS = 0 + + if (config != null) { + offsetsCommitRetries = config.offsetsCommitRetries + offsetsChannelBackoffMS = config.offsetsChannelBackoffMs + } + else { + offsetsCommitRetries = OffsetClientConfig.OffsetsCommitMaxRetries + offsetsChannelBackoffMS = OffsetClientConfig.OffsetsChannelBackoffMs + } + + var retriesRemaining = 1 + offsetsCommitRetries + + if( offsetsToCommit.size > 0) + { + val offsetCommitRequest = OffsetCommitRequest(groupId, offsetsToCommit, clientId = config.clientId) + + while (done != true || retriesRemaining == 0) { + try { + OffsetClient.commitOffsets(config, brokers, groupId, offsetCommitRequest) + done = true + } + catch { + case e: Exception => + done = false + } + retriesRemaining = retriesRemaining - 1 + + if (!done) { + debug("Retrying offset commit in %d ms".format(offsetsChannelBackoffMS)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMS) + } + } + } + else { + debug("No updates to offsets since last commit.") + true + } + } + else { + CommandLineUtils.checkRequiredArgs(parser, options, inFileOpt) + + val zkConnect = options.valueOf(zkConnectOpt) + + val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + val partitionOffsets: Map[String, String] = getPartitionOffsetsFromFile(partitionOffsetFile) + + updateZkOffsets(zkClient, partitionOffsets) + } + } + + private def getPartitionOffsetsFromFile(filename: String): Map[String, String] = { + val fr = new FileReader(filename) + val br = new BufferedReader(fr) + var partOffsetsMap: Map[String, String] = Map() + + var s: String = br.readLine() + while (s != null && s.length() >= 1) { + val tokens = s.split(":") + + partOffsetsMap += tokens(0) -> tokens(1) + debug("adding node path [" + s + "]") + + s = br.readLine() + } + + return partOffsetsMap + } + + private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String, String]): Unit = { + for ((partition, offset) <- partitionOffsets) { + debug("updating [" + partition + "] with offset [" + offset + "]") + + try { + ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) + } catch { + case e: Throwable => e.printStackTrace() + } + } + } + } + + diff --git a/core/src/main/scala/kafka/tools/OffsetClient.scala b/core/src/main/scala/kafka/tools/OffsetClient.scala new file mode 100644 index 0000000..bcfa216 --- /dev/null +++ b/core/src/main/scala/kafka/tools/OffsetClient.scala @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.tools + +import java.io.IOException +import java.util.concurrent.TimeUnit + +import kafka.api._ +import kafka.common.{KafkaException, ErrorMapping, TopicAndPartition} +import kafka.network.BlockingChannel +import kafka.utils.Logging +import kafka.cluster.Broker +import scala.util.Random + +/** + * A utility that provides APIs to fetch and commit offsets + * + */ +object OffsetClient extends Logging { + + private def getQueryChannel(config: OffsetClientConfig, brokers: Seq[Broker]): BlockingChannel = { + var socketTimeoutMs = { + if (config != null) + config.offsetsChannelSocketTimeoutMs + else + OffsetClientConfig.OffsetsChannelSocketTimeoutMs + } + var channel: BlockingChannel = null + var connected = false + val shuffledBrokers = Random.shuffle(brokers) + var i: Int = 0 + + while (!connected) { + val broker = shuffledBrokers(i) + i = (i + 1) % shuffledBrokers.size + trace("Connecting to broker...") + try { + channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, socketTimeoutMs) + channel.connect() + debug("Created channel to broker [%s:%d].".format(channel.host, channel.port)) + } + catch { + case e: Exception => + if (channel != null) channel.disconnect() + channel = null + error("Error while creating channel to [%s:%d]. Retrying...".format(broker.host, broker.port)) + } + connected = channel.isConnected + } + + channel + } + + /** + * Creates a blocking channel to the offset manager of the given group + */ + def getOffsetManagerChannel(config: OffsetClientConfig, brokers: Seq[Broker], group: String): Option[BlockingChannel] = { + var (offsetsChannelBackoffMs, socketTimeoutMs) = { + if (config != null) { + (config.offsetsChannelBackoffMs, config.offsetsChannelSocketTimeoutMs) + } + else { + (OffsetClientConfig.OffsetsChannelBackoffMs, OffsetClientConfig.OffsetsChannelSocketTimeoutMs) + } + } + var offSetManagerChannel: BlockingChannel = null + var queryChannel = getQueryChannel(config, brokers) + var offSetManagerChannelOpt: Option[BlockingChannel] = None + + while (!offSetManagerChannelOpt.isDefined) { + + var offsetManagerOpt: Option[Broker] = None + + while (!offsetManagerOpt.isDefined) { + try { + if (!queryChannel.isConnected) { + queryChannel = getQueryChannel(config, brokers) + } + debug("Querying [%s:%d] to locate offset manager for group [%s].".format(queryChannel.host, queryChannel.port, group)) + queryChannel.send(ConsumerMetadataRequest(group, ConsumerMetadataRequest.CurrentVersion, 0, config.clientId)) + val response = queryChannel.receive() + val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) + debug("Consumer metadata response: " + consumerMetadataResponse)//.coordinatorOpt.get.id) + if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) { + offsetManagerOpt = consumerMetadataResponse.coordinatorOpt + } + else { + debug("Query to [%s:%d] to locate offset manager for group [%s] failed - will retry in %d milliseconds." + .format(queryChannel.host, queryChannel.port, group, offsetsChannelBackoffMs)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) + } + } + catch { + case ioe: IOException => + error("Error while connecting to [%s:%d] for fetching consumer metadata".format(queryChannel.host, queryChannel.port)) + queryChannel.disconnect() + } + } + + val offsetManager = offsetManagerOpt.get + if (offsetManager.host == queryChannel.host && offsetManager.port == queryChannel.port) { + offSetManagerChannelOpt = Some(queryChannel) + } + else { + val connectString = "[%s:%d]".format(offsetManager.host, offsetManager.port) + try { + offSetManagerChannel = new BlockingChannel(offsetManager.host, offsetManager.port, BlockingChannel.UseDefaultBufferSize, + BlockingChannel.UseDefaultBufferSize, socketTimeoutMs) + offSetManagerChannel.connect() + offSetManagerChannelOpt = Some(offSetManagerChannel) + info("Connected to offset manager [%s:%d] for group [%s].".format(offSetManagerChannel.host, offSetManagerChannel.port, group)) + queryChannel.disconnect() + } + catch { + case ioe: IOException => + error("Error while connecting to %s.".format(connectString)) + if (offSetManagerChannel != null) offSetManagerChannel.disconnect() + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) + offSetManagerChannelOpt = None + } + } + } + + offSetManagerChannelOpt + } + + def fetchTopicMetadata(config: OffsetClientConfig, brokers: Seq[Broker], topics: Seq[String]): TopicMetadataResponse = { + var topicAndPartitions = List[TopicAndPartition]() + var fetchMetadataSucceeded = false + var i: Int = 0 + var topicMetadataChannel = getQueryChannel(config, brokers) + var topicMetaDataResponse: TopicMetadataResponse = null + var t: Throwable = null + + while (i < brokers.size && !fetchMetadataSucceeded) { + try { + if (!topicMetadataChannel.isConnected) { + topicMetadataChannel = getQueryChannel(config, brokers) + } + info("Fetching metadata from broker [%s:%d]".format(topicMetadataChannel.host, topicMetadataChannel.port)) + val topicMetaDataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics) + topicMetadataChannel.send(topicMetaDataRequest) + topicMetaDataResponse = TopicMetadataResponse.readFrom(topicMetadataChannel.receive().buffer) + fetchMetadataSucceeded = true + topicMetadataChannel.disconnect() + } + catch { + case e: Exception => + warn("Fetching metadata for topics %s from broker [%s:%d] failed".format(topics, topicMetadataChannel.host, topicMetadataChannel.port)) + topicMetadataChannel.disconnect() + t = e + } + finally + { + i = i + 1 + } + + // TimeUnit.MILLISECONDS.sleep(config.offsetsChannelBackoffMs) + } + + if(!fetchMetadataSucceeded) { + throw new KafkaException("fetching topic metadata for topics %s from broker [%s] failed".format(topics, brokers), t) + } else { + debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics)) + } + + topicMetaDataResponse + } + + + def fetchOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], group: String, offsetFetchRequest: OffsetFetchRequest): OffsetFetchResponse = { + var offsetsChannelBackoffMs = 0 + if (config != null) { + offsetsChannelBackoffMs = config.offsetsChannelBackoffMs + } + else { + offsetsChannelBackoffMs = OffsetClientConfig.OffsetsChannelBackoffMs + } + var offsetFetchResponseOpt: Option[OffsetFetchResponse] = None + while (!offsetFetchResponseOpt.isDefined) { + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers, group).get + try { + offsetManagerChannel.send(offsetFetchRequest) + val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetManagerChannel.receive().buffer) + trace("Offset fetch response: %s.".format(offsetFetchResponse)) + + val (leaderChanged, loadInProgress) = + offsetFetchResponse.requestInfo.foldLeft(false, false) { case (folded, (topicPartition, offsetMetadataAndError)) => + (folded._1 || (offsetMetadataAndError.error == ErrorMapping.NotCoordinatorForConsumerCode), + folded._2 || (offsetMetadataAndError.error == ErrorMapping.OffsetsLoadInProgressCode)) + } + + if (leaderChanged) { + offsetManagerChannel.disconnect() + debug("Could not fetch offsets (because offset manager has moved).") + } + else if (loadInProgress) { + debug("Could not fetch offsets (because offset cache is being loaded).") + } + else { + offsetFetchResponseOpt = Some(offsetFetchResponse) + offsetManagerChannel.disconnect() + } + } + catch { + case e: Exception => + warn("Error while fetching offsets from [%s:%d]".format(offsetManagerChannel.host, offsetManagerChannel.port)) + } + finally + { + if(offsetManagerChannel.isConnected) + offsetManagerChannel.disconnect() + } + + if (offsetFetchResponseOpt.isEmpty) { + debug("Retrying offset fetch in %d ms".format(offsetsChannelBackoffMs)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) + } + } + + offsetFetchResponseOpt.get + } + + def commitOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], group: String, offsetCommitRequest: OffsetCommitRequest): OffsetCommitResponse = { + var committed = false + var offsetCommitResponse: OffsetCommitResponse = null + + + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers, group).get + try { + offsetManagerChannel.send(offsetCommitRequest) + offsetCommitResponse = OffsetCommitResponse.readFrom(offsetManagerChannel.receive().buffer) + trace("Offset commit response: %s.".format(offsetCommitResponse)) + + val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { + offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) => + + (folded._1 || //update commitFailed + errorCode != ErrorMapping.NoError, + + folded._2 || //update retryableIfFailed - (only metadata too large is not retryable) + (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode), + + folded._3 || //update shouldRefreshCoordinator + errorCode == ErrorMapping.NotCoordinatorForConsumerCode || + errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode, + + //update error count + folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0)) + } + } + debug(errorCount + " errors in offset commit response.") + + if (shouldRefreshCoordinator) { + debug("Could not commit offsets (because offset manager has moved or is unavailable).") + offsetManagerChannel.disconnect() + } + + if (commitFailed && retryableIfFailed) { + committed = false + } + else { + committed = true + offsetManagerChannel.disconnect() + } + } + catch { + case t: Throwable => + error("Error while committing offsets.", t) + offsetManagerChannel.disconnect() + throw new KafkaException(t) + } + finally + { + if(offsetManagerChannel.isConnected) + offsetManagerChannel.disconnect() + } + + offsetCommitResponse + } + + // def commitOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], offsetCommitRequest: kafka.javaapi.OffsetCommitRequest, group: String): Boolean = { + // return commitOffsets(config, brokers, offsetCommitRequest.underlying, group) + // } +} + + + + diff --git a/core/src/main/scala/kafka/tools/OffsetClientConfig.scala b/core/src/main/scala/kafka/tools/OffsetClientConfig.scala new file mode 100644 index 0000000..69f4ce6 --- /dev/null +++ b/core/src/main/scala/kafka/tools/OffsetClientConfig.scala @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.tools + +import kafka.common.Config + +object OffsetClientConfig extends Config { + val OffsetsChannelSocketTimeoutMs = 10000 + val OffsetsChannelBackoffMs = 1000 + val DefaultClientId = "" + val OffsetsCommitMaxRetries = 5 +} + +case class OffsetClientConfig(offsetsChannelSocketTimeoutMs: Int = OffsetClientConfig.OffsetsChannelSocketTimeoutMs, + offsetsChannelBackoffMs: Int = OffsetClientConfig.OffsetsChannelBackoffMs, + offsetsCommitRetries: Int = OffsetClientConfig.OffsetsCommitMaxRetries, + clientId: String = OffsetClientConfig.DefaultClientId) { + + def this(clientId: String) { + this(OffsetClientConfig.OffsetsChannelSocketTimeoutMs, OffsetClientConfig.OffsetsChannelBackoffMs, + OffsetClientConfig.OffsetsCommitMaxRetries, clientId) + } +} + + -- 1.9.3 (Apple Git-50)