From 440325d65348cf6683301a3e859f2da6f5c792c6 Mon Sep 17 00:00:00 2001 From: mgharat Date: Sun, 21 Dec 2014 14:39:06 -0800 Subject: [PATCH 1/5] Reverted Changes to ZookeeperConsumerConnector.scala --- .../src/main/scala/kafka/tools/ExportOffsets.scala | 243 +++++++++++++++++ .../src/main/scala/kafka/tools/ImportOffsets.scala | 229 ++++++++++++++++ core/src/main/scala/kafka/tools/OffsetClient.scala | 303 +++++++++++++++++++++ .../scala/kafka/tools/OffsetClientConfig.scala | 39 +++ 4 files changed, 814 insertions(+) create mode 100644 core/src/main/scala/kafka/tools/ExportOffsets.scala create mode 100644 core/src/main/scala/kafka/tools/ImportOffsets.scala create mode 100644 core/src/main/scala/kafka/tools/OffsetClient.scala create mode 100644 core/src/main/scala/kafka/tools/OffsetClientConfig.scala diff --git a/core/src/main/scala/kafka/tools/ExportOffsets.scala b/core/src/main/scala/kafka/tools/ExportOffsets.scala new file mode 100644 index 0000000..024a033 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ExportOffsets.scala @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.tools + +import java.io.{BufferedWriter, File, FileWriter} +import joptsimple._ +import kafka.api.{OffsetFetchRequest, TopicMetadataResponse, OffsetFetchResponse} +import kafka.cluster.Broker +import kafka.common.TopicAndPartition +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient + + +/** + * A utility that retrieve the offset of broker partitions in ZK or OffsetManager and + * prints to an output file in the following format: + * + * ZK format + * /consumers/group1/offsets/topic1/1-0:286894308 + * /consumers/group1/offsets/topic1/2-0:284803985 + * + * OffsetManager file format + * topic/partition/offset + * + * This utility expects following arguments: + * For using Zookeeper : + * 1. Zk host:port string + * 2. group name (all groups implied if omitted) + * 3. output filename + * + * For using OffsetManager + * 1. group name + * 2. output filename + * 3. broker list (host:port) + * 4. config file (optional) + * 5. topic list (optional) + * + * To print debug message, add the following line to log4j.properties: + * log4j.logger.kafka.tools.ExportZkOffsets$=DEBUG + * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) + */ +object ExportOffsets extends Logging { + + def main(args: Array[String]) { + val parser = new OptionParser + + val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") + .withRequiredArg() + .defaultsTo("localhost:2181") + .ofType(classOf[String]) + + val groupOpt = parser.accepts("group", "Consumer group.") + .withRequiredArg() + .ofType(classOf[String]) + + val outFileOpt = parser.accepts("output-file", "Output file") + .withRequiredArg() + .ofType(classOf[String]) + + val brokerList = parser.accepts("broker-list", "List of brokers") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsChannelSocketTimeoutMs = parser.accepts("OffsetSocketChannelTimeout", "Socket Time out for the Offset channel") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsChannelBackoffMs = parser.accepts("OffsetChannelBackOffMs", "Time to wait before retrying to connect") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsCommitMaxRetries = parser.accepts("OffsetCommitMaxRetries", "Max number of retries for commiting offsets") + .withRequiredArg() + .ofType(classOf[String]) + + val topicList = parser.accepts("topics", "Topics for which offsets/Partitions should be fetched") + .withRequiredArg() + .ofType(classOf[String]) + + parser.accepts("help", "Print this message.") + + if (args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Export consumer offsets to an output file.") + + val options = parser.parse(args: _*) + + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } + + val allBrokers = options.valueOf(brokerList) + var brokers = Utils.parseBrokerList(allBrokers) + + var topics = collection.Seq[String]() + var topicListValue = options.valueOf(topicList) + if (topicListValue != null) { + topics = topicListValue.split(",").toSeq + } + + var offsetClientConfig: OffsetClientConfig = null + var value : String = null + val socketTimeOut = { + value = options.valueOf(offsetsChannelSocketTimeoutMs) + if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelSocketTimeoutMs + } + + val channelBackOff = { + value = options.valueOf(offsetsChannelBackoffMs) + if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelBackoffMs + } + + val commitRetries = { + value = options.valueOf(offsetsCommitMaxRetries) + if (value != null) value.toInt else OffsetClientConfig.OffsetsCommitMaxRetries + } + + val config = OffsetClientConfig(socketTimeOut, channelBackOff, commitRetries) + + val outfile = options.valueOf(outFileOpt) + + val groupId = options.valueOf(groupOpt) + + // checking whether to use zookeeper or offsetManager + if (allBrokers != null) { + val file = new File(outfile) + var offSetFetchResponseOpt: Option[OffsetFetchResponse] = None + var topicAndPartition = collection.Seq[TopicAndPartition]() + + var offsetsChannelBackoffMs = 0 + if (config != null) { + offsetsChannelBackoffMs = config.offsetsChannelBackoffMs + } + else { + offsetsChannelBackoffMs = OffsetClientConfig.OffsetsChannelBackoffMs + } + + if (topics.size > 0) { + val topicMetaDataResponse: TopicMetadataResponse = OffsetClient.fetchTopicMetadata(config, brokers, topics) + topicAndPartition = topicMetaDataResponse.topicsMetadata.flatMap(topicMetaData => { + val topic = topicMetaData.topic + topicMetaData.partitionsMetadata.map(partitionMetadata => { + new TopicAndPartition(topic, partitionMetadata.partitionId) + } + ) + } + ) + } + + val offsetFetchRequest = OffsetFetchRequest(groupId, requestInfo = topicAndPartition, clientId = config.clientId) + + while (!offSetFetchResponseOpt.isDefined) { + val response = OffsetClient.fetchOffsets(config, brokers, groupId, offsetFetchRequest) + println(response) + offSetFetchResponseOpt = Some(response) + } + val offsetFetchResponse = offSetFetchResponseOpt.get + val topicPartitionOffsetMap = offsetFetchResponse.requestInfo + + // writing to file + val writer = new BufferedWriter(new FileWriter(file)) + topicPartitionOffsetMap.foreach(tpo => { + val topic = tpo._1.topic + val partition = tpo._1.partition + val offset = tpo._2.offset + writer.write(topic + "/" + partition + "/" + offset) + writer.newLine() + }) + writer.flush() + writer.close() + } + else { + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, outFileOpt) + + val zkConnect = options.valueOf(zkConnectOpt) + val groups = options.valuesOf(groupOpt) + + var zkClient: ZkClient = null + val fileWriter: FileWriter = new FileWriter(outfile) + + try { + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + + var consumerGroups: Seq[String] = null + + if (groups.size == 0) { + consumerGroups = ZkUtils.getChildren(zkClient, ZkUtils.ConsumersPath).toList + } + else { + import scala.collection.JavaConversions._ + consumerGroups = groups + } + + for (consumerGrp <- consumerGroups) { + val topicsList = getTopicsList(zkClient, consumerGrp) + + for (topic <- topicsList) { + val bidPidList = getBrokeridPartition(zkClient, consumerGrp, topic) + + for (bidPid <- bidPidList) { + val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp, topic) + val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid + ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1 match { + case Some(offsetVal) => + fileWriter.write(offsetPath + ":" + offsetVal + "\n") + debug(offsetPath + " => " + offsetVal) + case None => + error("Could not retrieve offset value from " + offsetPath) + } + } + } + } + } + finally { + fileWriter.flush() + fileWriter.close() + } + } + } + + private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] = { + return ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList + } + + private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] = { + return ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList + } +} + diff --git a/core/src/main/scala/kafka/tools/ImportOffsets.scala b/core/src/main/scala/kafka/tools/ImportOffsets.scala new file mode 100644 index 0000000..9546644 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ImportOffsets.scala @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io.BufferedReader +import java.io.FileReader +import java.util.concurrent.TimeUnit +import joptsimple._ +import kafka.api.OffsetCommitRequest +import kafka.common.{OffsetAndMetadata, TopicAndPartition} +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient + + +/** + * A utility that updates the offset of broker partitions in ZK or OffsetManager. + * + * This utility expects following arguments: + * + * For using Zookeeper : + * 1. consumer properties file + * 2. a file contains partition offsets data such as: + * (This output data file can be obtained by running kafka.tools.ExportZkOffsets) + * + * For using OffsetManager : + * 1. group name + * 2. input filename (file to read from) + * 3. broker list (host:port) + * 4. config file (optional) + * + * ZK format + * /consumers/group1/offsets/topic1/3-0:285038193 + * /consumers/group1/offsets/topic1/1-0:286894308 + * + * OffsetManager file format + * topic/partition/offset + * + * To print debug message, add the following line to log4j.properties: + * log4j.logger.kafka.tools.ImportZkOffsets$=DEBUG + * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) + */ + + import scala.io.Source + + + + object ImportOffsets extends Logging { + + def main(args: Array[String]) { + val parser = new OptionParser + + val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") + .withRequiredArg() + .defaultsTo("localhost:2181") + .ofType(classOf[String]) + + val inFileOpt = parser.accepts("input-file", "Input file") + .withRequiredArg() + .ofType(classOf[String]) + + val brokerList = parser.accepts("broker-list", "List of brokers") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsChannelSocketTimeoutMs = parser.accepts("OffsetSocketChannelTimeout", "Socket Time out for the Offset channel") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsChannelBackoffMs = parser.accepts("OffsetChannelBackOffMs", "Time to wait before retrying to connect") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsCommitMaxRetries = parser.accepts("OffsetCommitMaxRetries", "Max number of retries for commiting offsets") + .withRequiredArg() + .ofType(classOf[String]) + + val group = parser.accepts("group", "Consumer Group Id") + .withRequiredArg() + .ofType(classOf[String]) + + parser.accepts("help", "Print this message.") + + if (args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Import offsets to zookeeper from files.") + + val options = parser.parse(args: _*) + + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } + + val allBrokers = options.valueOf(brokerList) + var brokers = Utils.parseBrokerList(allBrokers) + + var offsetClientConfig: OffsetClientConfig = null + var value : String = null + val socketTimeOut = { + value = options.valueOf(offsetsChannelSocketTimeoutMs) + if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelSocketTimeoutMs + } + + val channelBackOff = { + value = options.valueOf(offsetsChannelBackoffMs) + if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelBackoffMs + } + + val commitRetries = { + value = options.valueOf(offsetsCommitMaxRetries) + if (value != null) value.toInt else OffsetClientConfig.OffsetsCommitMaxRetries + } + + val config = OffsetClientConfig(socketTimeOut, channelBackOff, commitRetries) + + val partitionOffsetFile = options.valueOf(inFileOpt) + + var groupId: String = options.valueOf(group) + + // checking whether to use zookeeper or offsetManager + if (allBrokers != null) { + val fileLines = Source.fromFile(partitionOffsetFile).getLines().toList + val offsetsToCommit = fileLines.map(line => { + val topicPartitionOffset = line.split("/") + val topicAndPartition = new TopicAndPartition(topicPartitionOffset(0), topicPartitionOffset(1).toInt) + val offsetMetadata = new OffsetAndMetadata(topicPartitionOffset(2).toInt) + println(topicAndPartition + "->" + offsetMetadata) + (topicAndPartition -> offsetMetadata) + }).toMap + + var done = false + var offsetsCommitRetries = 0 + var offsetsChannelBackoffMS = 0 + + if (config != null) { + offsetsCommitRetries = config.offsetsCommitRetries + offsetsChannelBackoffMS = config.offsetsChannelBackoffMs + } + else { + offsetsCommitRetries = OffsetClientConfig.OffsetsCommitMaxRetries + offsetsChannelBackoffMS = OffsetClientConfig.OffsetsChannelBackoffMs + } + + var retriesRemaining = 1 + offsetsCommitRetries + + if( offsetsToCommit.size > 0) + { + val offsetCommitRequest = OffsetCommitRequest(groupId, offsetsToCommit, clientId = config.clientId) + + while (done != true || retriesRemaining == 0) { + try { + OffsetClient.commitOffsets(config, brokers, groupId, offsetCommitRequest) + done = true + } + catch { + case e: Exception => + done = false + } + retriesRemaining = retriesRemaining - 1 + + if (!done) { + debug("Retrying offset commit in %d ms".format(offsetsChannelBackoffMS)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMS) + } + } + } + else { + debug("No updates to offsets since last commit.") + true + } + } + else { + CommandLineUtils.checkRequiredArgs(parser, options, inFileOpt) + + val zkConnect = options.valueOf(zkConnectOpt) + + val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + val partitionOffsets: Map[String, String] = getPartitionOffsetsFromFile(partitionOffsetFile) + + updateZkOffsets(zkClient, partitionOffsets) + } + } + + private def getPartitionOffsetsFromFile(filename: String): Map[String, String] = { + val fr = new FileReader(filename) + val br = new BufferedReader(fr) + var partOffsetsMap: Map[String, String] = Map() + + var s: String = br.readLine() + while (s != null && s.length() >= 1) { + val tokens = s.split(":") + + partOffsetsMap += tokens(0) -> tokens(1) + debug("adding node path [" + s + "]") + + s = br.readLine() + } + + return partOffsetsMap + } + + private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String, String]): Unit = { + for ((partition, offset) <- partitionOffsets) { + debug("updating [" + partition + "] with offset [" + offset + "]") + + try { + ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) + } catch { + case e: Throwable => e.printStackTrace() + } + } + } + } + + diff --git a/core/src/main/scala/kafka/tools/OffsetClient.scala b/core/src/main/scala/kafka/tools/OffsetClient.scala new file mode 100644 index 0000000..bcfa216 --- /dev/null +++ b/core/src/main/scala/kafka/tools/OffsetClient.scala @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.tools + +import java.io.IOException +import java.util.concurrent.TimeUnit + +import kafka.api._ +import kafka.common.{KafkaException, ErrorMapping, TopicAndPartition} +import kafka.network.BlockingChannel +import kafka.utils.Logging +import kafka.cluster.Broker +import scala.util.Random + +/** + * A utility that provides APIs to fetch and commit offsets + * + */ +object OffsetClient extends Logging { + + private def getQueryChannel(config: OffsetClientConfig, brokers: Seq[Broker]): BlockingChannel = { + var socketTimeoutMs = { + if (config != null) + config.offsetsChannelSocketTimeoutMs + else + OffsetClientConfig.OffsetsChannelSocketTimeoutMs + } + var channel: BlockingChannel = null + var connected = false + val shuffledBrokers = Random.shuffle(brokers) + var i: Int = 0 + + while (!connected) { + val broker = shuffledBrokers(i) + i = (i + 1) % shuffledBrokers.size + trace("Connecting to broker...") + try { + channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, socketTimeoutMs) + channel.connect() + debug("Created channel to broker [%s:%d].".format(channel.host, channel.port)) + } + catch { + case e: Exception => + if (channel != null) channel.disconnect() + channel = null + error("Error while creating channel to [%s:%d]. Retrying...".format(broker.host, broker.port)) + } + connected = channel.isConnected + } + + channel + } + + /** + * Creates a blocking channel to the offset manager of the given group + */ + def getOffsetManagerChannel(config: OffsetClientConfig, brokers: Seq[Broker], group: String): Option[BlockingChannel] = { + var (offsetsChannelBackoffMs, socketTimeoutMs) = { + if (config != null) { + (config.offsetsChannelBackoffMs, config.offsetsChannelSocketTimeoutMs) + } + else { + (OffsetClientConfig.OffsetsChannelBackoffMs, OffsetClientConfig.OffsetsChannelSocketTimeoutMs) + } + } + var offSetManagerChannel: BlockingChannel = null + var queryChannel = getQueryChannel(config, brokers) + var offSetManagerChannelOpt: Option[BlockingChannel] = None + + while (!offSetManagerChannelOpt.isDefined) { + + var offsetManagerOpt: Option[Broker] = None + + while (!offsetManagerOpt.isDefined) { + try { + if (!queryChannel.isConnected) { + queryChannel = getQueryChannel(config, brokers) + } + debug("Querying [%s:%d] to locate offset manager for group [%s].".format(queryChannel.host, queryChannel.port, group)) + queryChannel.send(ConsumerMetadataRequest(group, ConsumerMetadataRequest.CurrentVersion, 0, config.clientId)) + val response = queryChannel.receive() + val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) + debug("Consumer metadata response: " + consumerMetadataResponse)//.coordinatorOpt.get.id) + if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) { + offsetManagerOpt = consumerMetadataResponse.coordinatorOpt + } + else { + debug("Query to [%s:%d] to locate offset manager for group [%s] failed - will retry in %d milliseconds." + .format(queryChannel.host, queryChannel.port, group, offsetsChannelBackoffMs)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) + } + } + catch { + case ioe: IOException => + error("Error while connecting to [%s:%d] for fetching consumer metadata".format(queryChannel.host, queryChannel.port)) + queryChannel.disconnect() + } + } + + val offsetManager = offsetManagerOpt.get + if (offsetManager.host == queryChannel.host && offsetManager.port == queryChannel.port) { + offSetManagerChannelOpt = Some(queryChannel) + } + else { + val connectString = "[%s:%d]".format(offsetManager.host, offsetManager.port) + try { + offSetManagerChannel = new BlockingChannel(offsetManager.host, offsetManager.port, BlockingChannel.UseDefaultBufferSize, + BlockingChannel.UseDefaultBufferSize, socketTimeoutMs) + offSetManagerChannel.connect() + offSetManagerChannelOpt = Some(offSetManagerChannel) + info("Connected to offset manager [%s:%d] for group [%s].".format(offSetManagerChannel.host, offSetManagerChannel.port, group)) + queryChannel.disconnect() + } + catch { + case ioe: IOException => + error("Error while connecting to %s.".format(connectString)) + if (offSetManagerChannel != null) offSetManagerChannel.disconnect() + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) + offSetManagerChannelOpt = None + } + } + } + + offSetManagerChannelOpt + } + + def fetchTopicMetadata(config: OffsetClientConfig, brokers: Seq[Broker], topics: Seq[String]): TopicMetadataResponse = { + var topicAndPartitions = List[TopicAndPartition]() + var fetchMetadataSucceeded = false + var i: Int = 0 + var topicMetadataChannel = getQueryChannel(config, brokers) + var topicMetaDataResponse: TopicMetadataResponse = null + var t: Throwable = null + + while (i < brokers.size && !fetchMetadataSucceeded) { + try { + if (!topicMetadataChannel.isConnected) { + topicMetadataChannel = getQueryChannel(config, brokers) + } + info("Fetching metadata from broker [%s:%d]".format(topicMetadataChannel.host, topicMetadataChannel.port)) + val topicMetaDataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics) + topicMetadataChannel.send(topicMetaDataRequest) + topicMetaDataResponse = TopicMetadataResponse.readFrom(topicMetadataChannel.receive().buffer) + fetchMetadataSucceeded = true + topicMetadataChannel.disconnect() + } + catch { + case e: Exception => + warn("Fetching metadata for topics %s from broker [%s:%d] failed".format(topics, topicMetadataChannel.host, topicMetadataChannel.port)) + topicMetadataChannel.disconnect() + t = e + } + finally + { + i = i + 1 + } + + // TimeUnit.MILLISECONDS.sleep(config.offsetsChannelBackoffMs) + } + + if(!fetchMetadataSucceeded) { + throw new KafkaException("fetching topic metadata for topics %s from broker [%s] failed".format(topics, brokers), t) + } else { + debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics)) + } + + topicMetaDataResponse + } + + + def fetchOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], group: String, offsetFetchRequest: OffsetFetchRequest): OffsetFetchResponse = { + var offsetsChannelBackoffMs = 0 + if (config != null) { + offsetsChannelBackoffMs = config.offsetsChannelBackoffMs + } + else { + offsetsChannelBackoffMs = OffsetClientConfig.OffsetsChannelBackoffMs + } + var offsetFetchResponseOpt: Option[OffsetFetchResponse] = None + while (!offsetFetchResponseOpt.isDefined) { + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers, group).get + try { + offsetManagerChannel.send(offsetFetchRequest) + val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetManagerChannel.receive().buffer) + trace("Offset fetch response: %s.".format(offsetFetchResponse)) + + val (leaderChanged, loadInProgress) = + offsetFetchResponse.requestInfo.foldLeft(false, false) { case (folded, (topicPartition, offsetMetadataAndError)) => + (folded._1 || (offsetMetadataAndError.error == ErrorMapping.NotCoordinatorForConsumerCode), + folded._2 || (offsetMetadataAndError.error == ErrorMapping.OffsetsLoadInProgressCode)) + } + + if (leaderChanged) { + offsetManagerChannel.disconnect() + debug("Could not fetch offsets (because offset manager has moved).") + } + else if (loadInProgress) { + debug("Could not fetch offsets (because offset cache is being loaded).") + } + else { + offsetFetchResponseOpt = Some(offsetFetchResponse) + offsetManagerChannel.disconnect() + } + } + catch { + case e: Exception => + warn("Error while fetching offsets from [%s:%d]".format(offsetManagerChannel.host, offsetManagerChannel.port)) + } + finally + { + if(offsetManagerChannel.isConnected) + offsetManagerChannel.disconnect() + } + + if (offsetFetchResponseOpt.isEmpty) { + debug("Retrying offset fetch in %d ms".format(offsetsChannelBackoffMs)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) + } + } + + offsetFetchResponseOpt.get + } + + def commitOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], group: String, offsetCommitRequest: OffsetCommitRequest): OffsetCommitResponse = { + var committed = false + var offsetCommitResponse: OffsetCommitResponse = null + + + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers, group).get + try { + offsetManagerChannel.send(offsetCommitRequest) + offsetCommitResponse = OffsetCommitResponse.readFrom(offsetManagerChannel.receive().buffer) + trace("Offset commit response: %s.".format(offsetCommitResponse)) + + val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { + offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) => + + (folded._1 || //update commitFailed + errorCode != ErrorMapping.NoError, + + folded._2 || //update retryableIfFailed - (only metadata too large is not retryable) + (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode), + + folded._3 || //update shouldRefreshCoordinator + errorCode == ErrorMapping.NotCoordinatorForConsumerCode || + errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode, + + //update error count + folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0)) + } + } + debug(errorCount + " errors in offset commit response.") + + if (shouldRefreshCoordinator) { + debug("Could not commit offsets (because offset manager has moved or is unavailable).") + offsetManagerChannel.disconnect() + } + + if (commitFailed && retryableIfFailed) { + committed = false + } + else { + committed = true + offsetManagerChannel.disconnect() + } + } + catch { + case t: Throwable => + error("Error while committing offsets.", t) + offsetManagerChannel.disconnect() + throw new KafkaException(t) + } + finally + { + if(offsetManagerChannel.isConnected) + offsetManagerChannel.disconnect() + } + + offsetCommitResponse + } + + // def commitOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], offsetCommitRequest: kafka.javaapi.OffsetCommitRequest, group: String): Boolean = { + // return commitOffsets(config, brokers, offsetCommitRequest.underlying, group) + // } +} + + + + diff --git a/core/src/main/scala/kafka/tools/OffsetClientConfig.scala b/core/src/main/scala/kafka/tools/OffsetClientConfig.scala new file mode 100644 index 0000000..69f4ce6 --- /dev/null +++ b/core/src/main/scala/kafka/tools/OffsetClientConfig.scala @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.tools + +import kafka.common.Config + +object OffsetClientConfig extends Config { + val OffsetsChannelSocketTimeoutMs = 10000 + val OffsetsChannelBackoffMs = 1000 + val DefaultClientId = "" + val OffsetsCommitMaxRetries = 5 +} + +case class OffsetClientConfig(offsetsChannelSocketTimeoutMs: Int = OffsetClientConfig.OffsetsChannelSocketTimeoutMs, + offsetsChannelBackoffMs: Int = OffsetClientConfig.OffsetsChannelBackoffMs, + offsetsCommitRetries: Int = OffsetClientConfig.OffsetsCommitMaxRetries, + clientId: String = OffsetClientConfig.DefaultClientId) { + + def this(clientId: String) { + this(OffsetClientConfig.OffsetsChannelSocketTimeoutMs, OffsetClientConfig.OffsetsChannelBackoffMs, + OffsetClientConfig.OffsetsCommitMaxRetries, clientId) + } +} + + -- 1.9.3 (Apple Git-50) From edc5e68bb4de9b4ca9517f87a0967ff88d607e43 Mon Sep 17 00:00:00 2001 From: mgharat Date: Sun, 21 Dec 2014 14:43:54 -0800 Subject: [PATCH 2/5] Reverted changes to ZookeeperConsumerConnector.scala --- core/src/main/scala/kafka/client/ClientUtils.scala | 101 ++--------------- .../main/scala/kafka/consumer/SimpleConsumer.scala | 17 --- .../kafka/javaapi/consumer/SimpleConsumer.scala | 20 ---- .../scala/kafka/tools/ConsumerOffsetChecker.scala | 4 +- .../main/scala/kafka/tools/ExportZkOffsets.scala | 124 --------------------- .../main/scala/kafka/tools/ImportZkOffsets.scala | 106 ------------------ core/src/main/scala/kafka/utils/Utils.scala | 19 ++++ .../test/scala/other/kafka/TestOffsetManager.scala | 13 ++- .../scala/unit/kafka/server/OffsetCommitTest.scala | 41 ++++--- 9 files changed, 59 insertions(+), 386 deletions(-) delete mode 100644 core/src/main/scala/kafka/tools/ExportZkOffsets.scala delete mode 100644 core/src/main/scala/kafka/tools/ImportZkOffsets.scala diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index ebba87f..03a5786 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -16,12 +16,14 @@ */ package kafka.client -import scala.collection._ + import kafka.tools.{OffsetClient, OffsetClientConfig} + + import scala.collection._ import kafka.cluster._ import kafka.api._ import kafka.producer._ import kafka.common.{ErrorMapping, KafkaException} -import kafka.utils.{Utils, Logging} +import kafka.utils.{ZkUtils, Utils, Logging} import java.util.Properties import util.Random import kafka.network.BlockingChannel @@ -43,36 +45,9 @@ object ClientUtils extends Logging{ * @return topic metadata response */ def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { - var fetchMetaDataSucceeded: Boolean = false - var i: Int = 0 - val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq) - var topicMetadataResponse: TopicMetadataResponse = null - var t: Throwable = null - // shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the - // same broker - val shuffledBrokers = Random.shuffle(brokers) - while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) { - val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i)) - info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics)) - try { - topicMetadataResponse = producer.send(topicMetadataRequest) - fetchMetaDataSucceeded = true - } - catch { - case e: Throwable => - warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed" - .format(correlationId, topics, shuffledBrokers(i).toString), e) - t = e - } finally { - i = i + 1 - producer.close() - } - } - if(!fetchMetaDataSucceeded) { - throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t) - } else { - debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics)) - } + val config = new OffsetClientConfig(producerConfig.requestTimeoutMs, producerConfig.retryBackoffMs, producerConfig.messageSendMaxRetries, producerConfig.clientId) + val allTopics = topics.toSeq + val topicMetadataResponse = OffsetClient.fetchTopicMetadata(config, brokers, allTopics) return topicMetadataResponse } @@ -137,64 +112,8 @@ object ClientUtils extends Logging{ * Creates a blocking channel to the offset manager of the given group */ def channelToOffsetManager(group: String, zkClient: ZkClient, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = { - var queryChannel = channelToAnyBroker(zkClient) - - var offsetManagerChannelOpt: Option[BlockingChannel] = None - - while (!offsetManagerChannelOpt.isDefined) { - - var coordinatorOpt: Option[Broker] = None - - while (!coordinatorOpt.isDefined) { - try { - if (!queryChannel.isConnected) - queryChannel = channelToAnyBroker(zkClient) - debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group)) - queryChannel.send(ConsumerMetadataRequest(group)) - val response = queryChannel.receive() - val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) - debug("Consumer metadata response: " + consumerMetadataResponse.toString) - if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) - coordinatorOpt = consumerMetadataResponse.coordinatorOpt - else { - debug("Query to %s:%d to locate offset manager for %s failed - will retry in %d milliseconds." - .format(queryChannel.host, queryChannel.port, group, retryBackOffMs)) - Thread.sleep(retryBackOffMs) - } - } - catch { - case ioe: IOException => - info("Failed to fetch consumer metadata from %s:%d.".format(queryChannel.host, queryChannel.port)) - queryChannel.disconnect() - } - } - - val coordinator = coordinatorOpt.get - if (coordinator.host == queryChannel.host && coordinator.port == queryChannel.port) { - offsetManagerChannelOpt = Some(queryChannel) - } else { - val connectString = "%s:%d".format(coordinator.host, coordinator.port) - var offsetManagerChannel: BlockingChannel = null - try { - debug("Connecting to offset manager %s.".format(connectString)) - offsetManagerChannel = new BlockingChannel(coordinator.host, coordinator.port, - BlockingChannel.UseDefaultBufferSize, - BlockingChannel.UseDefaultBufferSize, - socketTimeoutMs) - offsetManagerChannel.connect() - offsetManagerChannelOpt = Some(offsetManagerChannel) - queryChannel.disconnect() - } - catch { - case ioe: IOException => // offsets manager may have moved - info("Error while connecting to %s.".format(connectString)) - if (offsetManagerChannel != null) offsetManagerChannel.disconnect() - Thread.sleep(retryBackOffMs) - offsetManagerChannelOpt = None // just in case someone decides to change shutdownChannel to not swallow exceptions - } - } - } - - offsetManagerChannelOpt.get + val brokers = ZkUtils.getAllBrokersInCluster(zkClient) + val config = OffsetClientConfig(socketTimeoutMs, retryBackOffMs) + OffsetClient.getOffsetManagerChannel(config, brokers, group).get } } diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index e53ee51..00ba806 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -126,23 +126,6 @@ class SimpleConsumer(val host: String, */ def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer) - /** - * Commit offsets for a topic - * @param request a [[kafka.api.OffsetCommitRequest]] object. - * @return a [[kafka.api.OffsetCommitResponse]] object. - */ - def commitOffsets(request: OffsetCommitRequest) = { - // TODO: With KAFKA-1012, we have to first issue a ConsumerMetadataRequest and connect to the coordinator before - // we can commit offsets. - OffsetCommitResponse.readFrom(sendRequest(request).buffer) - } - - /** - * Fetch offsets for a topic - * @param request a [[kafka.api.OffsetFetchRequest]] object. - * @return a [[kafka.api.OffsetFetchResponse]] object. - */ - def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).buffer) private def getOrMakeConnection() { if(!isClosed && !blockingChannel.isConnected) { diff --git a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala index 0ab0195..cba075d 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala @@ -79,26 +79,6 @@ class SimpleConsumer(val host: String, underlying.getOffsetsBefore(request.underlying) } - /** - * Commit offsets for a topic - * @param request a [[kafka.javaapi.OffsetCommitRequest]] object. - * @return a [[kafka.javaapi.OffsetCommitResponse]] object. - */ - def commitOffsets(request: kafka.javaapi.OffsetCommitRequest): kafka.javaapi.OffsetCommitResponse = { - import kafka.javaapi.Implicits._ - underlying.commitOffsets(request.underlying) - } - - /** - * Fetch offsets for a topic - * @param request a [[kafka.javaapi.OffsetFetchRequest]] object. - * @return a [[kafka.javaapi.OffsetFetchResponse]] object. - */ - def fetchOffsets(request: kafka.javaapi.OffsetFetchRequest): kafka.javaapi.OffsetFetchResponse = { - import kafka.javaapi.Implicits._ - underlying.fetchOffsets(request.underlying) - } - def close() { underlying.close } diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index d1e7c43..c8c223b 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -158,7 +158,9 @@ object ConsumerOffsetChecker extends Logging { topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*) val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq - val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) + val config = OffsetClientConfig(channelSocketTimeoutMs, channelRetryBackoffMs) + val brokers = ZkUtils.getAllBrokersInCluster(zkClient) + val channel = OffsetClient.getOffsetManagerChannel(config, brokers, group).get debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) channel.send(OffsetFetchRequest(group, topicPartitions)) diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala deleted file mode 100644 index 4d051bc..0000000 --- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import java.io.FileWriter -import joptsimple._ -import kafka.utils.{Logging, ZkUtils, ZKStringSerializer, ZKGroupTopicDirs, CommandLineUtils} -import org.I0Itec.zkclient.ZkClient - - -/** - * A utility that retrieve the offset of broker partitions in ZK and - * prints to an output file in the following format: - * - * /consumers/group1/offsets/topic1/1-0:286894308 - * /consumers/group1/offsets/topic1/2-0:284803985 - * - * This utility expects 3 arguments: - * 1. Zk host:port string - * 2. group name (all groups implied if omitted) - * 3. output filename - * - * To print debug message, add the following line to log4j.properties: - * log4j.logger.kafka.tools.ExportZkOffsets$=DEBUG - * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) - */ -object ExportZkOffsets extends Logging { - - def main(args: Array[String]) { - val parser = new OptionParser - - val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") - .withRequiredArg() - .defaultsTo("localhost:2181") - .ofType(classOf[String]) - val groupOpt = parser.accepts("group", "Consumer group.") - .withRequiredArg() - .ofType(classOf[String]) - val outFileOpt = parser.accepts("output-file", "Output file") - .withRequiredArg() - .ofType(classOf[String]) - parser.accepts("help", "Print this message.") - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Export consumer offsets to an output file.") - - val options = parser.parse(args : _*) - - if (options.has("help")) { - parser.printHelpOn(System.out) - System.exit(0) - } - - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, outFileOpt) - - val zkConnect = options.valueOf(zkConnectOpt) - val groups = options.valuesOf(groupOpt) - val outfile = options.valueOf(outFileOpt) - - var zkClient : ZkClient = null - val fileWriter : FileWriter = new FileWriter(outfile) - - try { - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - - var consumerGroups: Seq[String] = null - - if (groups.size == 0) { - consumerGroups = ZkUtils.getChildren(zkClient, ZkUtils.ConsumersPath).toList - } - else { - import scala.collection.JavaConversions._ - consumerGroups = groups - } - - for (consumerGrp <- consumerGroups) { - val topicsList = getTopicsList(zkClient, consumerGrp) - - for (topic <- topicsList) { - val bidPidList = getBrokeridPartition(zkClient, consumerGrp, topic) - - for (bidPid <- bidPidList) { - val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic) - val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid - ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1 match { - case Some(offsetVal) => - fileWriter.write(offsetPath + ":" + offsetVal + "\n") - debug(offsetPath + " => " + offsetVal) - case None => - error("Could not retrieve offset value from " + offsetPath) - } - } - } - } - } - finally { - fileWriter.flush() - fileWriter.close() - } - } - - private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] = { - return ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList - } - - private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] = { - return ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList - } -} diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala deleted file mode 100644 index abe0972..0000000 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import java.io.BufferedReader -import java.io.FileReader -import joptsimple._ -import kafka.utils.{Logging, ZkUtils,ZKStringSerializer, CommandLineUtils} -import org.I0Itec.zkclient.ZkClient - - -/** - * A utility that updates the offset of broker partitions in ZK. - * - * This utility expects 2 input files as arguments: - * 1. consumer properties file - * 2. a file contains partition offsets data such as: - * (This output data file can be obtained by running kafka.tools.ExportZkOffsets) - * - * /consumers/group1/offsets/topic1/3-0:285038193 - * /consumers/group1/offsets/topic1/1-0:286894308 - * - * To print debug message, add the following line to log4j.properties: - * log4j.logger.kafka.tools.ImportZkOffsets$=DEBUG - * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) - */ -object ImportZkOffsets extends Logging { - - def main(args: Array[String]) { - val parser = new OptionParser - - val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") - .withRequiredArg() - .defaultsTo("localhost:2181") - .ofType(classOf[String]) - val inFileOpt = parser.accepts("input-file", "Input file") - .withRequiredArg() - .ofType(classOf[String]) - parser.accepts("help", "Print this message.") - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Import offsets to zookeeper from files.") - - val options = parser.parse(args : _*) - - if (options.has("help")) { - parser.printHelpOn(System.out) - System.exit(0) - } - - CommandLineUtils.checkRequiredArgs(parser, options, inFileOpt) - - val zkConnect = options.valueOf(zkConnectOpt) - val partitionOffsetFile = options.valueOf(inFileOpt) - - val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile) - - updateZkOffsets(zkClient, partitionOffsets) - } - - private def getPartitionOffsetsFromFile(filename: String):Map[String,String] = { - val fr = new FileReader(filename) - val br = new BufferedReader(fr) - var partOffsetsMap: Map[String,String] = Map() - - var s: String = br.readLine() - while ( s != null && s.length() >= 1) { - val tokens = s.split(":") - - partOffsetsMap += tokens(0) -> tokens(1) - debug("adding node path [" + s + "]") - - s = br.readLine() - } - - return partOffsetsMap - } - - private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = { - for ((partition, offset) <- partitionOffsets) { - debug("updating [" + partition + "] with offset [" + offset + "]") - - try { - ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) - } catch { - case e: Throwable => e.printStackTrace() - } - } - } -} diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 738c1af..61a4841 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -24,6 +24,8 @@ import java.nio.channels._ import java.util.concurrent.locks.{ReadWriteLock, Lock} import java.lang.management._ import javax.management._ +import kafka.cluster.Broker + import scala.collection._ import scala.collection.mutable import java.util.Properties @@ -607,4 +609,21 @@ object Utils extends Logging { .filter{ case (k,l) => (l > 1) } .keys } + + def parseBrokerList(brokerList: String): Seq[Broker] = + { + var brokers = mutable.Seq[Broker]() + if (brokerList != null) { + val hostAndPort = brokerList.split(",") + brokers = hostAndPort.map(hostPort => { + val temp = hostPort.split(":") + val host = temp(0) + val port = temp(1).toInt + new Broker(Math.random().toInt, host, port) + }) + } + + brokers + } + } diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index 41f334d..50c7da1 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -1,8 +1,9 @@ package other.kafka +import kafka.tools.{OffsetClient, OffsetClientConfig} import org.I0Itec.zkclient.ZkClient import kafka.api._ -import kafka.utils.{ShutdownableThread, ZKStringSerializer} +import kafka.utils.{ZkUtils, ShutdownableThread, ZKStringSerializer} import scala.collection._ import kafka.client.ClientUtils import joptsimple.OptionParser @@ -12,7 +13,6 @@ import scala.util.Random import java.io.IOException import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import java.util.concurrent.TimeUnit -import com.yammer.metrics.core.Gauge import java.util.concurrent.atomic.AtomicInteger import java.nio.channels.ClosedByInterruptException @@ -21,6 +21,7 @@ object TestOffsetManager { val random = new Random val SocketTimeoutMs = 10000 + val clientConfig = OffsetClientConfig(SocketTimeoutMs) class StatsThread(reportingIntervalMs: Long, commitThreads: Seq[CommitThread], fetchThread: FetchThread) extends ShutdownableThread("stats-thread") { @@ -52,7 +53,8 @@ object TestOffsetManager { private val group = "group-" + id private val metadata = "Metadata from commit thread " + id - private var offsetsChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs) + private val brokers = ZkUtils.getAllBrokersInCluster(zkClient) + private var offsetsChannel = OffsetClient.getOffsetManagerChannel(clientConfig, brokers, group).get private var offset = 0L val numErrors = new AtomicInteger(0) val numCommits = new AtomicInteger(0) @@ -62,7 +64,7 @@ object TestOffsetManager { private def ensureConnected() { if (!offsetsChannel.isConnected) - offsetsChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs) + offsetsChannel = OffsetClient.getOffsetManagerChannel(clientConfig, brokers, group).get } override def doWork() { @@ -124,7 +126,8 @@ object TestOffsetManager { val channel = if (channels.contains(coordinatorId)) channels(coordinatorId) else { - val newChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs) + val brokers = ZkUtils.getAllBrokersInCluster(zkClient) + val newChannel = OffsetClient.getOffsetManagerChannel(clientConfig, brokers, group).get channels.put(coordinatorId, newChannel) newChannel } diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 8c5364f..a64bcb5 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -18,14 +18,15 @@ package kafka.server import java.io.File +import kafka.cluster.Broker +import kafka.tools.{OffsetClientConfig, OffsetClient} import kafka.utils._ import junit.framework.Assert._ import java.util.Properties -import kafka.consumer.SimpleConsumer import org.junit.{After, Before, Test} import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite -import kafka.api.{ConsumerMetadataRequest, OffsetCommitRequest, OffsetFetchRequest} +import kafka.api.{OffsetCommitRequest, OffsetFetchRequest} import kafka.utils.TestUtils._ import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition} import scala.util.Random @@ -39,8 +40,8 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { var logSize: Int = 100 val brokerPort: Int = 9099 val group = "test-group" - var simpleConsumer: SimpleConsumer = null var time: Time = new MockTime() + var brokerList = Seq[Broker]() @Before override def setUp() { @@ -49,21 +50,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val logDirPath = config.getProperty("log.dir") logDir = new File(logDirPath) time = new MockTime() - server = TestUtils.createServer(new KafkaConfig(config), time) - simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "test-client") - val consumerMetadataRequest = ConsumerMetadataRequest(group) - Stream.continually { - val consumerMetadataResponse = simpleConsumer.send(consumerMetadataRequest) - consumerMetadataResponse.coordinatorOpt.isDefined - }.dropWhile(success => { - if (!success) Thread.sleep(1000) - !success - }) + val kafkaConfig = new KafkaConfig(config) + server = TestUtils.createServer(kafkaConfig, time) + brokerList = brokerList :+ new Broker(1,kafkaConfig.hostName , kafkaConfig.port) } @After override def tearDown() { - simpleConsumer.close server.shutdown Utils.rm(logDir) super.tearDown() @@ -80,13 +73,14 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = Seq(server)) val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L))) - val commitResponse = simpleConsumer.commitOffsets(commitRequest) + val clientConfig = OffsetClientConfig() + val commitResponse = OffsetClient.commitOffsets(clientConfig, brokerList, group, commitRequest) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get) // Fetch it and verify val fetchRequest = OffsetFetchRequest(group, Seq(topicAndPartition)) - val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest) + val fetchResponse = OffsetClient.fetchOffsets(clientConfig, brokerList, group, fetchRequest) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(topicAndPartition).get.error) assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(topicAndPartition).get.metadata) @@ -97,13 +91,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { offset=100L, metadata="some metadata" ))) - val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1) + val commitResponse1 = OffsetClient.commitOffsets(clientConfig, brokerList, group, commitRequest1) assertEquals(ErrorMapping.NoError, commitResponse1.commitStatus.get(topicAndPartition).get) // Fetch it and verify val fetchRequest1 = OffsetFetchRequest(group, Seq(topicAndPartition)) - val fetchResponse1 = simpleConsumer.fetchOffsets(fetchRequest1) + val fetchResponse1 = OffsetClient.fetchOffsets(clientConfig, brokerList, group, fetchRequest1) assertEquals(ErrorMapping.NoError, fetchResponse1.requestInfo.get(topicAndPartition).get.error) assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata) @@ -124,13 +118,15 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { createTopic(zkClient, topic3, servers = Seq(server), numPartitions = 1) createTopic(zkClient, topic4, servers = Seq(server), numPartitions = 1) + val clientConfig = OffsetClientConfig() + val commitRequest = OffsetCommitRequest("test-group", immutable.Map( TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="metadata one"), TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=43L, metadata="metadata two"), TopicAndPartition(topic3, 0) -> OffsetAndMetadata(offset=44L, metadata="metadata three"), TopicAndPartition(topic2, 1) -> OffsetAndMetadata(offset=45L) )) - val commitResponse = simpleConsumer.commitOffsets(commitRequest) + val commitResponse = OffsetClient.commitOffsets(clientConfig, brokerList, group, commitRequest) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic3, 0)).get) @@ -145,7 +141,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { TopicAndPartition(topic4, 0), // An unused topic TopicAndPartition(topic5, 0) // An unknown topic )) - val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest) + val fetchResponse = OffsetClient.fetchOffsets(clientConfig, brokerList, group, fetchRequest) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.error) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.error) @@ -181,12 +177,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(1)) createTopic(zkClient, topicAndPartition.topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = Seq(server)) + val clientConfig = OffsetClientConfig() val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata( offset=42L, metadata=random.nextString(server.config.offsetMetadataMaxSize) ))) - val commitResponse = simpleConsumer.commitOffsets(commitRequest) + val commitResponse = OffsetClient.commitOffsets(clientConfig, brokerList, group, commitRequest) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get) @@ -194,7 +191,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { offset=42L, metadata=random.nextString(server.config.offsetMetadataMaxSize + 1) ))) - val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1) + val commitResponse1 = OffsetClient.commitOffsets(clientConfig, brokerList, group, commitRequest1) assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get) -- 1.9.3 (Apple Git-50) From 4103cb9464d7321eea04764375e8331e6f486228 Mon Sep 17 00:00:00 2001 From: mgharat Date: Sat, 27 Dec 2014 16:12:02 -0800 Subject: [PATCH 3/5] Combined the import/export offset tool in to a single tool. Refactored OffsetClient --- core/src/main/scala/kafka/client/ClientUtils.scala | 53 ++- .../scala/kafka/tools/ConsumerOffsetChecker.scala | 3 +- .../scala/kafka/tools/ImportExportOffsetTool.scala | 369 +++++++++++++++++++++ .../main/scala/kafka/tools/KafkaOffsetClient.scala | 235 +++++++++++++ core/src/main/scala/kafka/tools/OffsetClient.scala | 300 +---------------- .../scala/kafka/tools/OffsetClientConfig.scala | 2 +- .../test/scala/other/kafka/TestOffsetManager.scala | 10 +- .../scala/unit/kafka/server/OffsetCommitTest.scala | 21 +- 8 files changed, 670 insertions(+), 323 deletions(-) create mode 100644 core/src/main/scala/kafka/tools/ImportExportOffsetTool.scala create mode 100644 core/src/main/scala/kafka/tools/KafkaOffsetClient.scala diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index 03a5786..f6d6692 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -16,9 +16,9 @@ */ package kafka.client - import kafka.tools.{OffsetClient, OffsetClientConfig} + import kafka.tools.{KafkaOffsetClient, OffsetClientConfig} - import scala.collection._ +import scala.collection._ import kafka.cluster._ import kafka.api._ import kafka.producer._ @@ -26,10 +26,9 @@ import kafka.common.{ErrorMapping, KafkaException} import kafka.utils.{ZkUtils, Utils, Logging} import java.util.Properties import util.Random - import kafka.network.BlockingChannel - import kafka.utils.ZkUtils._ - import org.I0Itec.zkclient.ZkClient - import java.io.IOException +import kafka.network.BlockingChannel +import kafka.utils.ZkUtils._ +import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.utils.Utils.{getHost, getPort} /** @@ -45,9 +44,36 @@ object ClientUtils extends Logging{ * @return topic metadata response */ def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { - val config = new OffsetClientConfig(producerConfig.requestTimeoutMs, producerConfig.retryBackoffMs, producerConfig.messageSendMaxRetries, producerConfig.clientId) - val allTopics = topics.toSeq - val topicMetadataResponse = OffsetClient.fetchTopicMetadata(config, brokers, allTopics) + var fetchMetaDataSucceeded: Boolean = false + var i: Int = 0 + val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq) + var topicMetadataResponse: TopicMetadataResponse = null + var t: Throwable = null + // shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the + // same broker + val shuffledBrokers = Random.shuffle(brokers) + while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) { + val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i)) + info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics)) + try { + topicMetadataResponse = producer.send(topicMetadataRequest) + fetchMetaDataSucceeded = true + } + catch { + case e: Throwable => + warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed" + .format(correlationId, topics, shuffledBrokers(i).toString), e) + t = e + } finally { + i = i + 1 + producer.close() + } + } + if(!fetchMetaDataSucceeded) { + throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t) + } else { + debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics)) + } return topicMetadataResponse } @@ -112,8 +138,9 @@ object ClientUtils extends Logging{ * Creates a blocking channel to the offset manager of the given group */ def channelToOffsetManager(group: String, zkClient: ZkClient, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = { - val brokers = ZkUtils.getAllBrokersInCluster(zkClient) - val config = OffsetClientConfig(socketTimeoutMs, retryBackOffMs) - OffsetClient.getOffsetManagerChannel(config, brokers, group).get + val brokers = ZkUtils.getAllBrokersInCluster(zkClient) + val config = OffsetClientConfig(socketTimeoutMs, retryBackOffMs) + val offsetClient = new KafkaOffsetClient(group); + offsetClient.getOffsetManagerChannel(config, brokers).get } } diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index c8c223b..4680142 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -160,7 +160,8 @@ object ConsumerOffsetChecker extends Logging { val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq val config = OffsetClientConfig(channelSocketTimeoutMs, channelRetryBackoffMs) val brokers = ZkUtils.getAllBrokersInCluster(zkClient) - val channel = OffsetClient.getOffsetManagerChannel(config, brokers, group).get + val offsetClient = new KafkaOffsetClient(group); + val channel = offsetClient.getOffsetManagerChannel(config, brokers).get debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) channel.send(OffsetFetchRequest(group, topicPartitions)) diff --git a/core/src/main/scala/kafka/tools/ImportExportOffsetTool.scala b/core/src/main/scala/kafka/tools/ImportExportOffsetTool.scala new file mode 100644 index 0000000..5b49c37 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ImportExportOffsetTool.scala @@ -0,0 +1,369 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io._ +import java.util.concurrent.TimeUnit +import joptsimple.OptionParser +import kafka.api.{OffsetFetchRequest, TopicMetadataResponse, OffsetFetchResponse, OffsetCommitRequest} +import kafka.client.ClientUtils +import kafka.common.{OffsetAndMetadata, TopicAndPartition} +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient +import scala.io.Source + +/** + * An import/export utility that retrieve or update the offset of broker partitions in ZK or OffsetManager. + * The export utility prints to an output file in the following format: + * + * ZK format + * /consumers/group1/offsets/topic1/1-0:286894308 + * /consumers/group1/offsets/topic1/2-0:284803985 + * + * OffsetManager file format + * topic/partition/offset + * + * The export operation expects following arguments: + * For using Zookeeper : + * 1. Zk host:port string + * 2. group name (all groups implied if omitted) + * 3. output filename + * + * For using OffsetManager + * 1. group name + * 2. output filename + * 3. broker list (host:port) + * 4. config file (optional) + * 5. topic list (optional) + * + * The import operation expects following arguments: + * For using Zookeeper : + * 1. consumer properties file + * 2. a file contains partition offsets data such as: + * (This output data file can be obtained by running kafka.tools.ExportZkOffsets) + * + * For using OffsetManager : + * 1. group name + * 2. input filename (file to read from) + * 3. broker list (host:port) + * 4. config file (optional) + * + * This utility expects an argument to determine the operation to be performed : + * operation ("export"/"import") + * + * To print debug message, add the following line to log4j.properties: + * log4j.logger.kafka.tools.ExportZkOffsets$=DEBUG + * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) + */ + +object ImportExportOffsetTool extends Logging { + val importOperation = "import" + val exportOperation = "export" + + def main(args: Array[String]): Unit = { + + val parser = new OptionParser() + + val operationOpt = parser.accepts("operation", "Operation to be performed : Import OR Export offsets") + .withRequiredArg() + .ofType(classOf[String]) + + val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") + .withRequiredArg() + .defaultsTo("localhost:2181") + .ofType(classOf[String]) + + val fileOpt = parser.accepts("file", "Input/Output file") + .withRequiredArg() + .ofType(classOf[String]) + + val brokerList = parser.accepts("broker-list", "List of brokers") + .withRequiredArg() + .ofType(classOf[String]) + + val group = parser.accepts("group", "Consumer Group Id") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsChannelSocketTimeoutMs = parser.accepts("OffsetSocketChannelTimeout", "Socket Time out for the Offset channel") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsChannelBackoffMs = parser.accepts("OffsetChannelBackOffMs", "Time to wait before retrying to connect") + .withRequiredArg() + .ofType(classOf[String]) + + // Specific to Importing Offsets + val offsetsCommitMaxRetries = parser.accepts("OffsetCommitMaxRetries", "Max number of retries for committing offsets") + .withRequiredArg() + .ofType(classOf[String]) + + // Specific to Exporting Offsets + val topicList = parser.accepts("topics", "Topics for which offsets/Partitions should be fetched") + .withRequiredArg() + .ofType(classOf[String]) + + parser.accepts("help", "Print this message.") + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Import/Export offsets to kafka/zookeeper from files.") + + val options = parser.parse(args: _*) + + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } + + // Brokers + val allBrokers = options.valueOf(brokerList) + var brokers = Utils.parseBrokerList(allBrokers) + + // OffsetClientConfig + var offsetClientConfig: OffsetClientConfig = null + var value: String = null + val socketTimeOut = { + value = options.valueOf(offsetsChannelSocketTimeoutMs) + if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelSocketTimeoutMs + } + val channelBackOff = { + value = options.valueOf(offsetsChannelBackoffMs) + if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelBackoffMs + } + val commitRetries = { + value = options.valueOf(offsetsCommitMaxRetries) + if (value != null) value.toInt else OffsetClientConfig.OffsetsCommitMaxRetries + } + + val config = OffsetClientConfig(socketTimeOut, channelBackOff, commitRetries) + + val partitionOffsetFile = options.valueOf(fileOpt) + + val groupId: String = options.valueOf(group) + + CommandLineUtils.checkRequiredArgs(parser, options, operationOpt) + val operation = options.valueOf(operationOpt) + // Import Offsets + if(operation.equalsIgnoreCase(importOperation)){ + // Check whether to use zookeeper or kafka OffsetManager + if (allBrokers != null) { + val fileLines = Source.fromFile(partitionOffsetFile).getLines().toList + val offsetsToCommit = fileLines.map(line => { + val topicPartitionOffset = line.split("/") + val topicAndPartition = new TopicAndPartition(topicPartitionOffset(0), topicPartitionOffset(1).toInt) + val offsetMetadata = new OffsetAndMetadata(topicPartitionOffset(2).toInt) + println(topicAndPartition + "->" + offsetMetadata) + (topicAndPartition -> offsetMetadata) + }).toMap + + var done = false + var offsetsCommitRetries = 0 + var offsetsChannelBackoffMS = 0 + + if (config != null) { + offsetsCommitRetries = config.offsetsCommitRetries + offsetsChannelBackoffMS = config.offsetsChannelBackoffMs + } + else { + offsetsCommitRetries = OffsetClientConfig.OffsetsCommitMaxRetries + offsetsChannelBackoffMS = OffsetClientConfig.OffsetsChannelBackoffMs + } + + var retriesRemaining = 1 + offsetsCommitRetries + + if (offsetsToCommit.size > 0) { + val offsetCommitRequest = OffsetCommitRequest(groupId, offsetsToCommit, clientId = config.clientId) + val offsetClient = new KafkaOffsetClient(groupId) + while (done != true || retriesRemaining == 0) { + try { + offsetClient.commitOffsets(config, brokers, offsetCommitRequest) + done = true + } + catch { + case e: Exception => + done = false + } + retriesRemaining = retriesRemaining - 1 + + if (!done) { + debug("Retrying offset commit in %d ms".format(offsetsChannelBackoffMS)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMS) + } + } + } + else { + debug("No updates to offsets since last commit.") + true + } + } + else { + CommandLineUtils.checkRequiredArgs(parser, options, fileOpt) + + val zkConnect = options.valueOf(zkConnectOpt) + + val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + val partitionOffsets: Map[String, String] = getPartitionOffsetsFromFile(partitionOffsetFile) + + updateZkOffsets(zkClient, partitionOffsets) + } + } + else if(operation.equalsIgnoreCase(exportOperation)) // Export Offsets + { + if (allBrokers != null) { + val file = new File(partitionOffsetFile) + var offSetFetchResponseOpt: Option[OffsetFetchResponse] = None + var topicAndPartition = collection.Seq[TopicAndPartition]() + + var offsetsChannelBackoffMs = 0 + if (config != null) { + offsetsChannelBackoffMs = config.offsetsChannelBackoffMs + } + else { + offsetsChannelBackoffMs = OffsetClientConfig.OffsetsChannelBackoffMs + } + + var topics = collection.Seq[String]() + var topicListValue = options.valueOf(topicList) + if (topicListValue != null) { + topics = topicListValue.split(",").toSeq + } + + if (topics.size > 0) { + val topicMetaDataResponse: TopicMetadataResponse = ClientUtils.fetchTopicMetadata(topics.toSet, brokers, OffsetClientConfig.DefaultClientId, OffsetClientConfig.OffsetsChannelSocketTimeoutMs) + topicAndPartition = topicMetaDataResponse.topicsMetadata.flatMap(topicMetaData => { + val topic = topicMetaData.topic + topicMetaData.partitionsMetadata.map(partitionMetadata => { + new TopicAndPartition(topic, partitionMetadata.partitionId) + } + ) + } + ) + } + + val offsetFetchRequest = OffsetFetchRequest(groupId, requestInfo = topicAndPartition, clientId = config.clientId) + val offsetClient = new KafkaOffsetClient(groupId) + + while (!offSetFetchResponseOpt.isDefined) { + val response = offsetClient.fetchOffsets(config, brokers, offsetFetchRequest) + println(response) + offSetFetchResponseOpt = Some(response) + } + val offsetFetchResponse = offSetFetchResponseOpt.get + val topicPartitionOffsetMap = offsetFetchResponse.requestInfo + + // writing to file + val writer = new BufferedWriter(new FileWriter(file)) + topicPartitionOffsetMap.foreach(tpo => { + val topic = tpo._1.topic + val partition = tpo._1.partition + val offset = tpo._2.offset + writer.write(topic + "/" + partition + "/" + offset) + writer.newLine() + }) + writer.flush() + writer.close() + } + else { + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, fileOpt) + + val zkConnect = options.valueOf(zkConnectOpt) + val groups = options.valuesOf(group) + + var zkClient: ZkClient = null + val fileWriter: FileWriter = new FileWriter(partitionOffsetFile) + + try { + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + + var consumerGroups: Seq[String] = null + + if (groups.size == 0) { + consumerGroups = ZkUtils.getChildren(zkClient, ZkUtils.ConsumersPath).toList + } + else { + import scala.collection.JavaConversions._ + consumerGroups = groups + } + + for (consumerGrp <- consumerGroups) { + val topicsList = getTopicsList(zkClient, consumerGrp) + + for (topic <- topicsList) { + val bidPidList = getBrokeridPartition(zkClient, consumerGrp, topic) + + for (bidPid <- bidPidList) { + val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp, topic) + val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid + ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1 match { + case Some(offsetVal) => + fileWriter.write(offsetPath + ":" + offsetVal + "\n") + debug(offsetPath + " => " + offsetVal) + case None => + error("Could not retrieve offset value from " + offsetPath) + } + } + } + } + } + finally { + fileWriter.flush() + fileWriter.close() + } + } + } + } + + private def getPartitionOffsetsFromFile(filename: String): Map[String, String] = { + val fr = new FileReader(filename) + val br = new BufferedReader(fr) + var partOffsetsMap: Map[String, String] = Map() + + var s: String = br.readLine() + while (s != null && s.length() >= 1) { + val tokens = s.split(":") + + partOffsetsMap += tokens(0) -> tokens(1) + debug("adding node path [" + s + "]") + + s = br.readLine() + } + + return partOffsetsMap + } + + private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String, String]): Unit = { + for ((partition, offset) <- partitionOffsets) { + debug("updating [" + partition + "] with offset [" + offset + "]") + + try { + ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) + } catch { + case e: Throwable => e.printStackTrace() + } + } + } + + private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] = { + return ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList + } + + private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] = { + return ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList + } + +} diff --git a/core/src/main/scala/kafka/tools/KafkaOffsetClient.scala b/core/src/main/scala/kafka/tools/KafkaOffsetClient.scala new file mode 100644 index 0000000..b560918 --- /dev/null +++ b/core/src/main/scala/kafka/tools/KafkaOffsetClient.scala @@ -0,0 +1,235 @@ +package kafka.tools + +import java.io.IOException +import java.util.concurrent.TimeUnit + +import kafka.api._ +import kafka.common.{KafkaException, ErrorMapping} +import kafka.network.BlockingChannel +import kafka.utils.Logging +import kafka.cluster.Broker +import scala.util.Random + +class KafkaOffsetClient(val group: String) extends OffsetClient with Logging{ + + override def fetchOffsets(config: OffsetClientConfig, brokers: Seq[Broker], offsetFetchRequest: OffsetFetchRequest): OffsetFetchResponse = { + var offsetsChannelBackoffMs = 0 + if (config != null) { + offsetsChannelBackoffMs = config.offsetsChannelBackoffMs + } + else { + offsetsChannelBackoffMs = OffsetClientConfig.OffsetsChannelBackoffMs + } + var offsetFetchResponseOpt: Option[OffsetFetchResponse] = None + while (!offsetFetchResponseOpt.isDefined) { + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers).get + try { + offsetManagerChannel.send(offsetFetchRequest) + val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetManagerChannel.receive().buffer) + trace("Offset fetch response: %s.".format(offsetFetchResponse)) + + val (leaderChanged, loadInProgress) = + offsetFetchResponse.requestInfo.foldLeft(false, false) { case (folded, (topicPartition, offsetMetadataAndError)) => + (folded._1 || (offsetMetadataAndError.error == ErrorMapping.NotCoordinatorForConsumerCode), + folded._2 || (offsetMetadataAndError.error == ErrorMapping.OffsetsLoadInProgressCode)) + } + + if (leaderChanged) { + offsetManagerChannel.disconnect() + debug("Could not fetch offsets (because offset manager has moved).") + } + else if (loadInProgress) { + debug("Could not fetch offsets (because offset cache is being loaded).") + } + else { + offsetFetchResponseOpt = Some(offsetFetchResponse) + offsetManagerChannel.disconnect() + } + } + catch { + case e: Exception => + warn("Error while fetching offsets from [%s:%d]".format(offsetManagerChannel.host, offsetManagerChannel.port)) + } + finally + { + if(offsetManagerChannel.isConnected) + offsetManagerChannel.disconnect() + } + + if (offsetFetchResponseOpt.isEmpty) { + debug("Retrying offset fetch in %d ms".format(offsetsChannelBackoffMs)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) + } + } + + offsetFetchResponseOpt.get + } + + override def commitOffsets(config: OffsetClientConfig, brokers: Seq[Broker], offsetCommitRequest: OffsetCommitRequest): OffsetCommitResponse = { + var committed = false + var offsetCommitResponse: OffsetCommitResponse = null + + + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers).get + try { + offsetManagerChannel.send(offsetCommitRequest) + offsetCommitResponse = OffsetCommitResponse.readFrom(offsetManagerChannel.receive().buffer) + trace("Offset commit response: %s.".format(offsetCommitResponse)) + + val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { + offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) => + + (folded._1 || //update commitFailed + errorCode != ErrorMapping.NoError, + + folded._2 || //update retryableIfFailed - (only metadata too large is not retryable) + (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode), + + folded._3 || //update shouldRefreshCoordinator + errorCode == ErrorMapping.NotCoordinatorForConsumerCode || + errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode, + + //update error count + folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0)) + } + } + debug(errorCount + " errors in offset commit response.") + + if (shouldRefreshCoordinator) { + debug("Could not commit offsets (because offset manager has moved or is unavailable).") + offsetManagerChannel.disconnect() + } + + if (commitFailed && retryableIfFailed) { + committed = false + } + else { + committed = true + offsetManagerChannel.disconnect() + } + } + catch { + case t: Throwable => + error("Error while committing offsets.", t) + offsetManagerChannel.disconnect() + throw new KafkaException(t) + } + finally + { + if(offsetManagerChannel.isConnected) + offsetManagerChannel.disconnect() + } + + offsetCommitResponse + } + + override def close: Unit = { + info("OffsetClient closed") + } + + private def getQueryChannel(config: OffsetClientConfig, brokers: Seq[Broker]): BlockingChannel = { + var socketTimeoutMs = { + if (config != null) + config.offsetsChannelSocketTimeoutMs + else + OffsetClientConfig.OffsetsChannelSocketTimeoutMs + } + var channel: BlockingChannel = null + var connected = false + val shuffledBrokers = Random.shuffle(brokers) + var i: Int = 0 + + while (!connected) { + val broker = shuffledBrokers(i) + i = (i + 1) % shuffledBrokers.size + trace("Connecting to broker...") + try { + channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, socketTimeoutMs) + channel.connect() + debug("Created channel to broker [%s:%d].".format(channel.host, channel.port)) + } + catch { + case e: Exception => + if (channel != null) channel.disconnect() + channel = null + error("Error while creating channel to [%s:%d]. Retrying...".format(broker.host, broker.port)) + } + connected = channel.isConnected + } + + channel + } + + /** + * Creates a blocking channel to the offset manager of the given group + */ + def getOffsetManagerChannel(config: OffsetClientConfig, brokers: Seq[Broker]): Option[BlockingChannel] = { + var (offsetsChannelBackoffMs, socketTimeoutMs) = { + if (config != null) { + (config.offsetsChannelBackoffMs, config.offsetsChannelSocketTimeoutMs) + } + else { + (OffsetClientConfig.OffsetsChannelBackoffMs, OffsetClientConfig.OffsetsChannelSocketTimeoutMs) + } + } + var offSetManagerChannel: BlockingChannel = null + var queryChannel = getQueryChannel(config, brokers) + var offSetManagerChannelOpt: Option[BlockingChannel] = None + + while (!offSetManagerChannelOpt.isDefined) { + + var offsetManagerOpt: Option[Broker] = None + + while (!offsetManagerOpt.isDefined) { + try { + if (!queryChannel.isConnected) { + queryChannel = getQueryChannel(config, brokers) + } + debug("Querying [%s:%d] to locate offset manager for group [%s].".format(queryChannel.host, queryChannel.port, group)) + queryChannel.send(ConsumerMetadataRequest(group, ConsumerMetadataRequest.CurrentVersion, 0, config.clientId)) + val response = queryChannel.receive() + val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) + debug("Consumer metadata response: " + consumerMetadataResponse)//.coordinatorOpt.get.id) + if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) { + offsetManagerOpt = consumerMetadataResponse.coordinatorOpt + } + else { + debug("Query to [%s:%d] to locate offset manager for group [%s] failed - will retry in %d milliseconds." + .format(queryChannel.host, queryChannel.port, group, offsetsChannelBackoffMs)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) + } + } + catch { + case ioe: IOException => + error("Error while connecting to [%s:%d] for fetching consumer metadata".format(queryChannel.host, queryChannel.port)) + queryChannel.disconnect() + } + } + + val offsetManager = offsetManagerOpt.get + if (offsetManager.host == queryChannel.host && offsetManager.port == queryChannel.port) { + offSetManagerChannelOpt = Some(queryChannel) + } + else { + val connectString = "[%s:%d]".format(offsetManager.host, offsetManager.port) + try { + offSetManagerChannel = new BlockingChannel(offsetManager.host, offsetManager.port, BlockingChannel.UseDefaultBufferSize, + BlockingChannel.UseDefaultBufferSize, socketTimeoutMs) + offSetManagerChannel.connect() + offSetManagerChannelOpt = Some(offSetManagerChannel) + info("Connected to offset manager [%s:%d] for group [%s].".format(offSetManagerChannel.host, offSetManagerChannel.port, group)) + queryChannel.disconnect() + } + catch { + case ioe: IOException => + error("Error while connecting to %s.".format(connectString)) + if (offSetManagerChannel != null) offSetManagerChannel.disconnect() + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) + offSetManagerChannelOpt = None + } + } + } + + offSetManagerChannelOpt + } +} diff --git a/core/src/main/scala/kafka/tools/OffsetClient.scala b/core/src/main/scala/kafka/tools/OffsetClient.scala index bcfa216..7a51443 100644 --- a/core/src/main/scala/kafka/tools/OffsetClient.scala +++ b/core/src/main/scala/kafka/tools/OffsetClient.scala @@ -1,303 +1,13 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package kafka.tools -import java.io.IOException -import java.util.concurrent.TimeUnit - -import kafka.api._ -import kafka.common.{KafkaException, ErrorMapping, TopicAndPartition} -import kafka.network.BlockingChannel -import kafka.utils.Logging +import kafka.api.{OffsetCommitResponse, OffsetCommitRequest, OffsetFetchResponse, OffsetFetchRequest} import kafka.cluster.Broker -import scala.util.Random - -/** - * A utility that provides APIs to fetch and commit offsets - * - */ -object OffsetClient extends Logging { - - private def getQueryChannel(config: OffsetClientConfig, brokers: Seq[Broker]): BlockingChannel = { - var socketTimeoutMs = { - if (config != null) - config.offsetsChannelSocketTimeoutMs - else - OffsetClientConfig.OffsetsChannelSocketTimeoutMs - } - var channel: BlockingChannel = null - var connected = false - val shuffledBrokers = Random.shuffle(brokers) - var i: Int = 0 - - while (!connected) { - val broker = shuffledBrokers(i) - i = (i + 1) % shuffledBrokers.size - trace("Connecting to broker...") - try { - channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, socketTimeoutMs) - channel.connect() - debug("Created channel to broker [%s:%d].".format(channel.host, channel.port)) - } - catch { - case e: Exception => - if (channel != null) channel.disconnect() - channel = null - error("Error while creating channel to [%s:%d]. Retrying...".format(broker.host, broker.port)) - } - connected = channel.isConnected - } - - channel - } - - /** - * Creates a blocking channel to the offset manager of the given group - */ - def getOffsetManagerChannel(config: OffsetClientConfig, brokers: Seq[Broker], group: String): Option[BlockingChannel] = { - var (offsetsChannelBackoffMs, socketTimeoutMs) = { - if (config != null) { - (config.offsetsChannelBackoffMs, config.offsetsChannelSocketTimeoutMs) - } - else { - (OffsetClientConfig.OffsetsChannelBackoffMs, OffsetClientConfig.OffsetsChannelSocketTimeoutMs) - } - } - var offSetManagerChannel: BlockingChannel = null - var queryChannel = getQueryChannel(config, brokers) - var offSetManagerChannelOpt: Option[BlockingChannel] = None - - while (!offSetManagerChannelOpt.isDefined) { - - var offsetManagerOpt: Option[Broker] = None - - while (!offsetManagerOpt.isDefined) { - try { - if (!queryChannel.isConnected) { - queryChannel = getQueryChannel(config, brokers) - } - debug("Querying [%s:%d] to locate offset manager for group [%s].".format(queryChannel.host, queryChannel.port, group)) - queryChannel.send(ConsumerMetadataRequest(group, ConsumerMetadataRequest.CurrentVersion, 0, config.clientId)) - val response = queryChannel.receive() - val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) - debug("Consumer metadata response: " + consumerMetadataResponse)//.coordinatorOpt.get.id) - if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) { - offsetManagerOpt = consumerMetadataResponse.coordinatorOpt - } - else { - debug("Query to [%s:%d] to locate offset manager for group [%s] failed - will retry in %d milliseconds." - .format(queryChannel.host, queryChannel.port, group, offsetsChannelBackoffMs)) - TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) - } - } - catch { - case ioe: IOException => - error("Error while connecting to [%s:%d] for fetching consumer metadata".format(queryChannel.host, queryChannel.port)) - queryChannel.disconnect() - } - } - - val offsetManager = offsetManagerOpt.get - if (offsetManager.host == queryChannel.host && offsetManager.port == queryChannel.port) { - offSetManagerChannelOpt = Some(queryChannel) - } - else { - val connectString = "[%s:%d]".format(offsetManager.host, offsetManager.port) - try { - offSetManagerChannel = new BlockingChannel(offsetManager.host, offsetManager.port, BlockingChannel.UseDefaultBufferSize, - BlockingChannel.UseDefaultBufferSize, socketTimeoutMs) - offSetManagerChannel.connect() - offSetManagerChannelOpt = Some(offSetManagerChannel) - info("Connected to offset manager [%s:%d] for group [%s].".format(offSetManagerChannel.host, offSetManagerChannel.port, group)) - queryChannel.disconnect() - } - catch { - case ioe: IOException => - error("Error while connecting to %s.".format(connectString)) - if (offSetManagerChannel != null) offSetManagerChannel.disconnect() - TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) - offSetManagerChannelOpt = None - } - } - } - - offSetManagerChannelOpt - } - - def fetchTopicMetadata(config: OffsetClientConfig, brokers: Seq[Broker], topics: Seq[String]): TopicMetadataResponse = { - var topicAndPartitions = List[TopicAndPartition]() - var fetchMetadataSucceeded = false - var i: Int = 0 - var topicMetadataChannel = getQueryChannel(config, brokers) - var topicMetaDataResponse: TopicMetadataResponse = null - var t: Throwable = null - - while (i < brokers.size && !fetchMetadataSucceeded) { - try { - if (!topicMetadataChannel.isConnected) { - topicMetadataChannel = getQueryChannel(config, brokers) - } - info("Fetching metadata from broker [%s:%d]".format(topicMetadataChannel.host, topicMetadataChannel.port)) - val topicMetaDataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics) - topicMetadataChannel.send(topicMetaDataRequest) - topicMetaDataResponse = TopicMetadataResponse.readFrom(topicMetadataChannel.receive().buffer) - fetchMetadataSucceeded = true - topicMetadataChannel.disconnect() - } - catch { - case e: Exception => - warn("Fetching metadata for topics %s from broker [%s:%d] failed".format(topics, topicMetadataChannel.host, topicMetadataChannel.port)) - topicMetadataChannel.disconnect() - t = e - } - finally - { - i = i + 1 - } - - // TimeUnit.MILLISECONDS.sleep(config.offsetsChannelBackoffMs) - } - - if(!fetchMetadataSucceeded) { - throw new KafkaException("fetching topic metadata for topics %s from broker [%s] failed".format(topics, brokers), t) - } else { - debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics)) - } - - topicMetaDataResponse - } - - def fetchOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], group: String, offsetFetchRequest: OffsetFetchRequest): OffsetFetchResponse = { - var offsetsChannelBackoffMs = 0 - if (config != null) { - offsetsChannelBackoffMs = config.offsetsChannelBackoffMs - } - else { - offsetsChannelBackoffMs = OffsetClientConfig.OffsetsChannelBackoffMs - } - var offsetFetchResponseOpt: Option[OffsetFetchResponse] = None - while (!offsetFetchResponseOpt.isDefined) { - val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers, group).get - try { - offsetManagerChannel.send(offsetFetchRequest) - val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetManagerChannel.receive().buffer) - trace("Offset fetch response: %s.".format(offsetFetchResponse)) +trait OffsetClient { - val (leaderChanged, loadInProgress) = - offsetFetchResponse.requestInfo.foldLeft(false, false) { case (folded, (topicPartition, offsetMetadataAndError)) => - (folded._1 || (offsetMetadataAndError.error == ErrorMapping.NotCoordinatorForConsumerCode), - folded._2 || (offsetMetadataAndError.error == ErrorMapping.OffsetsLoadInProgressCode)) - } + def fetchOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], offsetFetchRequest: OffsetFetchRequest): OffsetFetchResponse - if (leaderChanged) { - offsetManagerChannel.disconnect() - debug("Could not fetch offsets (because offset manager has moved).") - } - else if (loadInProgress) { - debug("Could not fetch offsets (because offset cache is being loaded).") - } - else { - offsetFetchResponseOpt = Some(offsetFetchResponse) - offsetManagerChannel.disconnect() - } - } - catch { - case e: Exception => - warn("Error while fetching offsets from [%s:%d]".format(offsetManagerChannel.host, offsetManagerChannel.port)) - } - finally - { - if(offsetManagerChannel.isConnected) - offsetManagerChannel.disconnect() - } + def commitOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], offsetCommitRequest: OffsetCommitRequest): OffsetCommitResponse - if (offsetFetchResponseOpt.isEmpty) { - debug("Retrying offset fetch in %d ms".format(offsetsChannelBackoffMs)) - TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) - } - } - - offsetFetchResponseOpt.get - } - - def commitOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], group: String, offsetCommitRequest: OffsetCommitRequest): OffsetCommitResponse = { - var committed = false - var offsetCommitResponse: OffsetCommitResponse = null - - - val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers, group).get - try { - offsetManagerChannel.send(offsetCommitRequest) - offsetCommitResponse = OffsetCommitResponse.readFrom(offsetManagerChannel.receive().buffer) - trace("Offset commit response: %s.".format(offsetCommitResponse)) - - val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { - offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) => - - (folded._1 || //update commitFailed - errorCode != ErrorMapping.NoError, - - folded._2 || //update retryableIfFailed - (only metadata too large is not retryable) - (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode), - - folded._3 || //update shouldRefreshCoordinator - errorCode == ErrorMapping.NotCoordinatorForConsumerCode || - errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode, - - //update error count - folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0)) - } - } - debug(errorCount + " errors in offset commit response.") - - if (shouldRefreshCoordinator) { - debug("Could not commit offsets (because offset manager has moved or is unavailable).") - offsetManagerChannel.disconnect() - } - - if (commitFailed && retryableIfFailed) { - committed = false - } - else { - committed = true - offsetManagerChannel.disconnect() - } - } - catch { - case t: Throwable => - error("Error while committing offsets.", t) - offsetManagerChannel.disconnect() - throw new KafkaException(t) - } - finally - { - if(offsetManagerChannel.isConnected) - offsetManagerChannel.disconnect() - } - - offsetCommitResponse - } - - // def commitOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], offsetCommitRequest: kafka.javaapi.OffsetCommitRequest, group: String): Boolean = { - // return commitOffsets(config, brokers, offsetCommitRequest.underlying, group) - // } + def close } - - - - diff --git a/core/src/main/scala/kafka/tools/OffsetClientConfig.scala b/core/src/main/scala/kafka/tools/OffsetClientConfig.scala index 69f4ce6..bc9452c 100644 --- a/core/src/main/scala/kafka/tools/OffsetClientConfig.scala +++ b/core/src/main/scala/kafka/tools/OffsetClientConfig.scala @@ -18,7 +18,7 @@ package kafka.tools import kafka.common.Config -object OffsetClientConfig extends Config { +object OffsetClientConfig extends Config{ val OffsetsChannelSocketTimeoutMs = 10000 val OffsetsChannelBackoffMs = 1000 val DefaultClientId = "" diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index 50c7da1..fa4de6a 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -1,6 +1,6 @@ package other.kafka -import kafka.tools.{OffsetClient, OffsetClientConfig} +import kafka.tools.{KafkaOffsetClient, OffsetClientConfig} import org.I0Itec.zkclient.ZkClient import kafka.api._ import kafka.utils.{ZkUtils, ShutdownableThread, ZKStringSerializer} @@ -54,7 +54,8 @@ object TestOffsetManager { private val group = "group-" + id private val metadata = "Metadata from commit thread " + id private val brokers = ZkUtils.getAllBrokersInCluster(zkClient) - private var offsetsChannel = OffsetClient.getOffsetManagerChannel(clientConfig, brokers, group).get + private val offsetClient = new KafkaOffsetClient(group) + private var offsetsChannel = offsetClient.getOffsetManagerChannel(clientConfig, brokers).get private var offset = 0L val numErrors = new AtomicInteger(0) val numCommits = new AtomicInteger(0) @@ -64,7 +65,7 @@ object TestOffsetManager { private def ensureConnected() { if (!offsetsChannel.isConnected) - offsetsChannel = OffsetClient.getOffsetManagerChannel(clientConfig, brokers, group).get + offsetsChannel = offsetClient.getOffsetManagerChannel(clientConfig, brokers).get } override def doWork() { @@ -127,7 +128,8 @@ object TestOffsetManager { channels(coordinatorId) else { val brokers = ZkUtils.getAllBrokersInCluster(zkClient) - val newChannel = OffsetClient.getOffsetManagerChannel(clientConfig, brokers, group).get + val offsetClient = new KafkaOffsetClient(group) + val newChannel = offsetClient.getOffsetManagerChannel(clientConfig, brokers).get channels.put(coordinatorId, newChannel) newChannel } diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index a64bcb5..fd5e52f 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -19,7 +19,7 @@ package kafka.server import java.io.File import kafka.cluster.Broker -import kafka.tools.{OffsetClientConfig, OffsetClient} +import kafka.tools.{KafkaOffsetClient, OffsetClientConfig} import kafka.utils._ import junit.framework.Assert._ import java.util.Properties @@ -74,13 +74,14 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L))) val clientConfig = OffsetClientConfig() - val commitResponse = OffsetClient.commitOffsets(clientConfig, brokerList, group, commitRequest) + val offsetClient = new KafkaOffsetClient(group) + val commitResponse = offsetClient.commitOffsets(clientConfig, brokerList, commitRequest) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get) // Fetch it and verify val fetchRequest = OffsetFetchRequest(group, Seq(topicAndPartition)) - val fetchResponse = OffsetClient.fetchOffsets(clientConfig, brokerList, group, fetchRequest) + val fetchResponse = offsetClient.fetchOffsets(clientConfig, brokerList, fetchRequest) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(topicAndPartition).get.error) assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(topicAndPartition).get.metadata) @@ -91,13 +92,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { offset=100L, metadata="some metadata" ))) - val commitResponse1 = OffsetClient.commitOffsets(clientConfig, brokerList, group, commitRequest1) + val commitResponse1 = offsetClient.commitOffsets(clientConfig, brokerList, commitRequest1) assertEquals(ErrorMapping.NoError, commitResponse1.commitStatus.get(topicAndPartition).get) // Fetch it and verify val fetchRequest1 = OffsetFetchRequest(group, Seq(topicAndPartition)) - val fetchResponse1 = OffsetClient.fetchOffsets(clientConfig, brokerList, group, fetchRequest1) + val fetchResponse1 = offsetClient.fetchOffsets(clientConfig, brokerList, fetchRequest1) assertEquals(ErrorMapping.NoError, fetchResponse1.requestInfo.get(topicAndPartition).get.error) assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata) @@ -119,6 +120,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { createTopic(zkClient, topic4, servers = Seq(server), numPartitions = 1) val clientConfig = OffsetClientConfig() + val offsetClient = new KafkaOffsetClient(group) val commitRequest = OffsetCommitRequest("test-group", immutable.Map( TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="metadata one"), @@ -126,7 +128,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { TopicAndPartition(topic3, 0) -> OffsetAndMetadata(offset=44L, metadata="metadata three"), TopicAndPartition(topic2, 1) -> OffsetAndMetadata(offset=45L) )) - val commitResponse = OffsetClient.commitOffsets(clientConfig, brokerList, group, commitRequest) + val commitResponse = offsetClient.commitOffsets(clientConfig, brokerList, commitRequest) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic3, 0)).get) @@ -141,7 +143,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { TopicAndPartition(topic4, 0), // An unused topic TopicAndPartition(topic5, 0) // An unknown topic )) - val fetchResponse = OffsetClient.fetchOffsets(clientConfig, brokerList, group, fetchRequest) + val fetchResponse = offsetClient.fetchOffsets(clientConfig, brokerList, fetchRequest) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.error) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.error) @@ -178,12 +180,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { createTopic(zkClient, topicAndPartition.topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = Seq(server)) val clientConfig = OffsetClientConfig() + val offsetClient = new KafkaOffsetClient(group) val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata( offset=42L, metadata=random.nextString(server.config.offsetMetadataMaxSize) ))) - val commitResponse = OffsetClient.commitOffsets(clientConfig, brokerList, group, commitRequest) + val commitResponse = offsetClient.commitOffsets(clientConfig, brokerList, commitRequest) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get) @@ -191,7 +194,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { offset=42L, metadata=random.nextString(server.config.offsetMetadataMaxSize + 1) ))) - val commitResponse1 = OffsetClient.commitOffsets(clientConfig, brokerList, group, commitRequest1) + val commitResponse1 = offsetClient.commitOffsets(clientConfig, brokerList, commitRequest1) assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get) -- 1.9.3 (Apple Git-50) From efad02be4c3ff8e83dd6415942bc6cb9125a42ae Mon Sep 17 00:00:00 2001 From: mgharat Date: Sat, 27 Dec 2014 16:14:51 -0800 Subject: [PATCH 4/5] Deleted old Export/Import offsets tool --- .../src/main/scala/kafka/tools/ExportOffsets.scala | 243 --------------------- .../src/main/scala/kafka/tools/ImportOffsets.scala | 229 ------------------- 2 files changed, 472 deletions(-) delete mode 100644 core/src/main/scala/kafka/tools/ExportOffsets.scala delete mode 100644 core/src/main/scala/kafka/tools/ImportOffsets.scala diff --git a/core/src/main/scala/kafka/tools/ExportOffsets.scala b/core/src/main/scala/kafka/tools/ExportOffsets.scala deleted file mode 100644 index 024a033..0000000 --- a/core/src/main/scala/kafka/tools/ExportOffsets.scala +++ /dev/null @@ -1,243 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.tools - -import java.io.{BufferedWriter, File, FileWriter} -import joptsimple._ -import kafka.api.{OffsetFetchRequest, TopicMetadataResponse, OffsetFetchResponse} -import kafka.cluster.Broker -import kafka.common.TopicAndPartition -import kafka.utils._ -import org.I0Itec.zkclient.ZkClient - - -/** - * A utility that retrieve the offset of broker partitions in ZK or OffsetManager and - * prints to an output file in the following format: - * - * ZK format - * /consumers/group1/offsets/topic1/1-0:286894308 - * /consumers/group1/offsets/topic1/2-0:284803985 - * - * OffsetManager file format - * topic/partition/offset - * - * This utility expects following arguments: - * For using Zookeeper : - * 1. Zk host:port string - * 2. group name (all groups implied if omitted) - * 3. output filename - * - * For using OffsetManager - * 1. group name - * 2. output filename - * 3. broker list (host:port) - * 4. config file (optional) - * 5. topic list (optional) - * - * To print debug message, add the following line to log4j.properties: - * log4j.logger.kafka.tools.ExportZkOffsets$=DEBUG - * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) - */ -object ExportOffsets extends Logging { - - def main(args: Array[String]) { - val parser = new OptionParser - - val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") - .withRequiredArg() - .defaultsTo("localhost:2181") - .ofType(classOf[String]) - - val groupOpt = parser.accepts("group", "Consumer group.") - .withRequiredArg() - .ofType(classOf[String]) - - val outFileOpt = parser.accepts("output-file", "Output file") - .withRequiredArg() - .ofType(classOf[String]) - - val brokerList = parser.accepts("broker-list", "List of brokers") - .withRequiredArg() - .ofType(classOf[String]) - - val offsetsChannelSocketTimeoutMs = parser.accepts("OffsetSocketChannelTimeout", "Socket Time out for the Offset channel") - .withRequiredArg() - .ofType(classOf[String]) - - val offsetsChannelBackoffMs = parser.accepts("OffsetChannelBackOffMs", "Time to wait before retrying to connect") - .withRequiredArg() - .ofType(classOf[String]) - - val offsetsCommitMaxRetries = parser.accepts("OffsetCommitMaxRetries", "Max number of retries for commiting offsets") - .withRequiredArg() - .ofType(classOf[String]) - - val topicList = parser.accepts("topics", "Topics for which offsets/Partitions should be fetched") - .withRequiredArg() - .ofType(classOf[String]) - - parser.accepts("help", "Print this message.") - - if (args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Export consumer offsets to an output file.") - - val options = parser.parse(args: _*) - - if (options.has("help")) { - parser.printHelpOn(System.out) - System.exit(0) - } - - val allBrokers = options.valueOf(brokerList) - var brokers = Utils.parseBrokerList(allBrokers) - - var topics = collection.Seq[String]() - var topicListValue = options.valueOf(topicList) - if (topicListValue != null) { - topics = topicListValue.split(",").toSeq - } - - var offsetClientConfig: OffsetClientConfig = null - var value : String = null - val socketTimeOut = { - value = options.valueOf(offsetsChannelSocketTimeoutMs) - if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelSocketTimeoutMs - } - - val channelBackOff = { - value = options.valueOf(offsetsChannelBackoffMs) - if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelBackoffMs - } - - val commitRetries = { - value = options.valueOf(offsetsCommitMaxRetries) - if (value != null) value.toInt else OffsetClientConfig.OffsetsCommitMaxRetries - } - - val config = OffsetClientConfig(socketTimeOut, channelBackOff, commitRetries) - - val outfile = options.valueOf(outFileOpt) - - val groupId = options.valueOf(groupOpt) - - // checking whether to use zookeeper or offsetManager - if (allBrokers != null) { - val file = new File(outfile) - var offSetFetchResponseOpt: Option[OffsetFetchResponse] = None - var topicAndPartition = collection.Seq[TopicAndPartition]() - - var offsetsChannelBackoffMs = 0 - if (config != null) { - offsetsChannelBackoffMs = config.offsetsChannelBackoffMs - } - else { - offsetsChannelBackoffMs = OffsetClientConfig.OffsetsChannelBackoffMs - } - - if (topics.size > 0) { - val topicMetaDataResponse: TopicMetadataResponse = OffsetClient.fetchTopicMetadata(config, brokers, topics) - topicAndPartition = topicMetaDataResponse.topicsMetadata.flatMap(topicMetaData => { - val topic = topicMetaData.topic - topicMetaData.partitionsMetadata.map(partitionMetadata => { - new TopicAndPartition(topic, partitionMetadata.partitionId) - } - ) - } - ) - } - - val offsetFetchRequest = OffsetFetchRequest(groupId, requestInfo = topicAndPartition, clientId = config.clientId) - - while (!offSetFetchResponseOpt.isDefined) { - val response = OffsetClient.fetchOffsets(config, brokers, groupId, offsetFetchRequest) - println(response) - offSetFetchResponseOpt = Some(response) - } - val offsetFetchResponse = offSetFetchResponseOpt.get - val topicPartitionOffsetMap = offsetFetchResponse.requestInfo - - // writing to file - val writer = new BufferedWriter(new FileWriter(file)) - topicPartitionOffsetMap.foreach(tpo => { - val topic = tpo._1.topic - val partition = tpo._1.partition - val offset = tpo._2.offset - writer.write(topic + "/" + partition + "/" + offset) - writer.newLine() - }) - writer.flush() - writer.close() - } - else { - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, outFileOpt) - - val zkConnect = options.valueOf(zkConnectOpt) - val groups = options.valuesOf(groupOpt) - - var zkClient: ZkClient = null - val fileWriter: FileWriter = new FileWriter(outfile) - - try { - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - - var consumerGroups: Seq[String] = null - - if (groups.size == 0) { - consumerGroups = ZkUtils.getChildren(zkClient, ZkUtils.ConsumersPath).toList - } - else { - import scala.collection.JavaConversions._ - consumerGroups = groups - } - - for (consumerGrp <- consumerGroups) { - val topicsList = getTopicsList(zkClient, consumerGrp) - - for (topic <- topicsList) { - val bidPidList = getBrokeridPartition(zkClient, consumerGrp, topic) - - for (bidPid <- bidPidList) { - val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp, topic) - val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid - ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1 match { - case Some(offsetVal) => - fileWriter.write(offsetPath + ":" + offsetVal + "\n") - debug(offsetPath + " => " + offsetVal) - case None => - error("Could not retrieve offset value from " + offsetPath) - } - } - } - } - } - finally { - fileWriter.flush() - fileWriter.close() - } - } - } - - private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] = { - return ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList - } - - private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] = { - return ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList - } -} - diff --git a/core/src/main/scala/kafka/tools/ImportOffsets.scala b/core/src/main/scala/kafka/tools/ImportOffsets.scala deleted file mode 100644 index 9546644..0000000 --- a/core/src/main/scala/kafka/tools/ImportOffsets.scala +++ /dev/null @@ -1,229 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import java.io.BufferedReader -import java.io.FileReader -import java.util.concurrent.TimeUnit -import joptsimple._ -import kafka.api.OffsetCommitRequest -import kafka.common.{OffsetAndMetadata, TopicAndPartition} -import kafka.utils._ -import org.I0Itec.zkclient.ZkClient - - -/** - * A utility that updates the offset of broker partitions in ZK or OffsetManager. - * - * This utility expects following arguments: - * - * For using Zookeeper : - * 1. consumer properties file - * 2. a file contains partition offsets data such as: - * (This output data file can be obtained by running kafka.tools.ExportZkOffsets) - * - * For using OffsetManager : - * 1. group name - * 2. input filename (file to read from) - * 3. broker list (host:port) - * 4. config file (optional) - * - * ZK format - * /consumers/group1/offsets/topic1/3-0:285038193 - * /consumers/group1/offsets/topic1/1-0:286894308 - * - * OffsetManager file format - * topic/partition/offset - * - * To print debug message, add the following line to log4j.properties: - * log4j.logger.kafka.tools.ImportZkOffsets$=DEBUG - * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) - */ - - import scala.io.Source - - - - object ImportOffsets extends Logging { - - def main(args: Array[String]) { - val parser = new OptionParser - - val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") - .withRequiredArg() - .defaultsTo("localhost:2181") - .ofType(classOf[String]) - - val inFileOpt = parser.accepts("input-file", "Input file") - .withRequiredArg() - .ofType(classOf[String]) - - val brokerList = parser.accepts("broker-list", "List of brokers") - .withRequiredArg() - .ofType(classOf[String]) - - val offsetsChannelSocketTimeoutMs = parser.accepts("OffsetSocketChannelTimeout", "Socket Time out for the Offset channel") - .withRequiredArg() - .ofType(classOf[String]) - - val offsetsChannelBackoffMs = parser.accepts("OffsetChannelBackOffMs", "Time to wait before retrying to connect") - .withRequiredArg() - .ofType(classOf[String]) - - val offsetsCommitMaxRetries = parser.accepts("OffsetCommitMaxRetries", "Max number of retries for commiting offsets") - .withRequiredArg() - .ofType(classOf[String]) - - val group = parser.accepts("group", "Consumer Group Id") - .withRequiredArg() - .ofType(classOf[String]) - - parser.accepts("help", "Print this message.") - - if (args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Import offsets to zookeeper from files.") - - val options = parser.parse(args: _*) - - if (options.has("help")) { - parser.printHelpOn(System.out) - System.exit(0) - } - - val allBrokers = options.valueOf(brokerList) - var brokers = Utils.parseBrokerList(allBrokers) - - var offsetClientConfig: OffsetClientConfig = null - var value : String = null - val socketTimeOut = { - value = options.valueOf(offsetsChannelSocketTimeoutMs) - if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelSocketTimeoutMs - } - - val channelBackOff = { - value = options.valueOf(offsetsChannelBackoffMs) - if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelBackoffMs - } - - val commitRetries = { - value = options.valueOf(offsetsCommitMaxRetries) - if (value != null) value.toInt else OffsetClientConfig.OffsetsCommitMaxRetries - } - - val config = OffsetClientConfig(socketTimeOut, channelBackOff, commitRetries) - - val partitionOffsetFile = options.valueOf(inFileOpt) - - var groupId: String = options.valueOf(group) - - // checking whether to use zookeeper or offsetManager - if (allBrokers != null) { - val fileLines = Source.fromFile(partitionOffsetFile).getLines().toList - val offsetsToCommit = fileLines.map(line => { - val topicPartitionOffset = line.split("/") - val topicAndPartition = new TopicAndPartition(topicPartitionOffset(0), topicPartitionOffset(1).toInt) - val offsetMetadata = new OffsetAndMetadata(topicPartitionOffset(2).toInt) - println(topicAndPartition + "->" + offsetMetadata) - (topicAndPartition -> offsetMetadata) - }).toMap - - var done = false - var offsetsCommitRetries = 0 - var offsetsChannelBackoffMS = 0 - - if (config != null) { - offsetsCommitRetries = config.offsetsCommitRetries - offsetsChannelBackoffMS = config.offsetsChannelBackoffMs - } - else { - offsetsCommitRetries = OffsetClientConfig.OffsetsCommitMaxRetries - offsetsChannelBackoffMS = OffsetClientConfig.OffsetsChannelBackoffMs - } - - var retriesRemaining = 1 + offsetsCommitRetries - - if( offsetsToCommit.size > 0) - { - val offsetCommitRequest = OffsetCommitRequest(groupId, offsetsToCommit, clientId = config.clientId) - - while (done != true || retriesRemaining == 0) { - try { - OffsetClient.commitOffsets(config, brokers, groupId, offsetCommitRequest) - done = true - } - catch { - case e: Exception => - done = false - } - retriesRemaining = retriesRemaining - 1 - - if (!done) { - debug("Retrying offset commit in %d ms".format(offsetsChannelBackoffMS)) - TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMS) - } - } - } - else { - debug("No updates to offsets since last commit.") - true - } - } - else { - CommandLineUtils.checkRequiredArgs(parser, options, inFileOpt) - - val zkConnect = options.valueOf(zkConnectOpt) - - val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - val partitionOffsets: Map[String, String] = getPartitionOffsetsFromFile(partitionOffsetFile) - - updateZkOffsets(zkClient, partitionOffsets) - } - } - - private def getPartitionOffsetsFromFile(filename: String): Map[String, String] = { - val fr = new FileReader(filename) - val br = new BufferedReader(fr) - var partOffsetsMap: Map[String, String] = Map() - - var s: String = br.readLine() - while (s != null && s.length() >= 1) { - val tokens = s.split(":") - - partOffsetsMap += tokens(0) -> tokens(1) - debug("adding node path [" + s + "]") - - s = br.readLine() - } - - return partOffsetsMap - } - - private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String, String]): Unit = { - for ((partition, offset) <- partitionOffsets) { - debug("updating [" + partition + "] with offset [" + offset + "]") - - try { - ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) - } catch { - case e: Throwable => e.printStackTrace() - } - } - } - } - - -- 1.9.3 (Apple Git-50) From 333ffc6c0da27069cf53f1467fe472cee83a75ca Mon Sep 17 00:00:00 2001 From: mgharat Date: Sat, 27 Dec 2014 16:20:52 -0800 Subject: [PATCH 5/5] Added apache license --- .../src/main/scala/kafka/tools/KafkaOffsetClient.scala | 18 +++++++++++++++++- core/src/main/scala/kafka/tools/OffsetClient.scala | 17 +++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/KafkaOffsetClient.scala b/core/src/main/scala/kafka/tools/KafkaOffsetClient.scala index b560918..3b1e052 100644 --- a/core/src/main/scala/kafka/tools/KafkaOffsetClient.scala +++ b/core/src/main/scala/kafka/tools/KafkaOffsetClient.scala @@ -1,8 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package kafka.tools import java.io.IOException import java.util.concurrent.TimeUnit - import kafka.api._ import kafka.common.{KafkaException, ErrorMapping} import kafka.network.BlockingChannel diff --git a/core/src/main/scala/kafka/tools/OffsetClient.scala b/core/src/main/scala/kafka/tools/OffsetClient.scala index 7a51443..1b9607a 100644 --- a/core/src/main/scala/kafka/tools/OffsetClient.scala +++ b/core/src/main/scala/kafka/tools/OffsetClient.scala @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package kafka.tools import kafka.api.{OffsetCommitResponse, OffsetCommitRequest, OffsetFetchResponse, OffsetFetchRequest} -- 1.9.3 (Apple Git-50)