From ae5c47683e92682c936b7b2204b05eba914ec336 Mon Sep 17 00:00:00 2001 From: mgharat Date: Wed, 1 Oct 2014 21:03:58 -0700 Subject: [PATCH] Made changes to client, ExportOffsets,ImportOffsets and test cases --- core/src/main/scala/kafka/client/ClientUtils.scala | 101 +------- .../main/scala/kafka/consumer/SimpleConsumer.scala | 17 -- .../consumer/ZookeeperConsumerConnector.scala | 202 +++++---------- .../kafka/javaapi/consumer/SimpleConsumer.scala | 20 -- .../scala/kafka/tools/ConsumerOffsetChecker.scala | 5 +- .../src/main/scala/kafka/tools/ExportOffsets.scala | 227 ++++++++++++++++ .../main/scala/kafka/tools/ExportZkOffsets.scala | 124 --------- .../src/main/scala/kafka/tools/ImportOffsets.scala | 229 ++++++++++++++++ .../main/scala/kafka/tools/ImportZkOffsets.scala | 106 -------- core/src/main/scala/kafka/tools/OffsetClient.scala | 287 +++++++++++++++++++++ .../scala/kafka/tools/OffsetClientConfig.scala | 23 ++ core/src/main/scala/kafka/utils/Utils.scala | 19 ++ .../test/scala/other/kafka/TestOffsetManager.scala | 13 +- .../scala/unit/kafka/server/OffsetCommitTest.scala | 38 ++- 14 files changed, 891 insertions(+), 520 deletions(-) create mode 100644 core/src/main/scala/kafka/tools/ExportOffsets.scala delete mode 100644 core/src/main/scala/kafka/tools/ExportZkOffsets.scala create mode 100644 core/src/main/scala/kafka/tools/ImportOffsets.scala delete mode 100644 core/src/main/scala/kafka/tools/ImportZkOffsets.scala create mode 100644 core/src/main/scala/kafka/tools/OffsetClient.scala create mode 100644 core/src/main/scala/kafka/tools/OffsetClientConfig.scala diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index ebba87f..03a5786 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -16,12 +16,14 @@ */ package kafka.client -import scala.collection._ + import kafka.tools.{OffsetClient, OffsetClientConfig} + + import scala.collection._ import kafka.cluster._ import kafka.api._ import kafka.producer._ import kafka.common.{ErrorMapping, KafkaException} -import kafka.utils.{Utils, Logging} +import kafka.utils.{ZkUtils, Utils, Logging} import java.util.Properties import util.Random import kafka.network.BlockingChannel @@ -43,36 +45,9 @@ object ClientUtils extends Logging{ * @return topic metadata response */ def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { - var fetchMetaDataSucceeded: Boolean = false - var i: Int = 0 - val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq) - var topicMetadataResponse: TopicMetadataResponse = null - var t: Throwable = null - // shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the - // same broker - val shuffledBrokers = Random.shuffle(brokers) - while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) { - val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i)) - info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics)) - try { - topicMetadataResponse = producer.send(topicMetadataRequest) - fetchMetaDataSucceeded = true - } - catch { - case e: Throwable => - warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed" - .format(correlationId, topics, shuffledBrokers(i).toString), e) - t = e - } finally { - i = i + 1 - producer.close() - } - } - if(!fetchMetaDataSucceeded) { - throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t) - } else { - debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics)) - } + val config = new OffsetClientConfig(producerConfig.requestTimeoutMs, producerConfig.retryBackoffMs, producerConfig.messageSendMaxRetries, producerConfig.clientId) + val allTopics = topics.toSeq + val topicMetadataResponse = OffsetClient.fetchTopicMetadata(config, brokers, allTopics) return topicMetadataResponse } @@ -137,64 +112,8 @@ object ClientUtils extends Logging{ * Creates a blocking channel to the offset manager of the given group */ def channelToOffsetManager(group: String, zkClient: ZkClient, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = { - var queryChannel = channelToAnyBroker(zkClient) - - var offsetManagerChannelOpt: Option[BlockingChannel] = None - - while (!offsetManagerChannelOpt.isDefined) { - - var coordinatorOpt: Option[Broker] = None - - while (!coordinatorOpt.isDefined) { - try { - if (!queryChannel.isConnected) - queryChannel = channelToAnyBroker(zkClient) - debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group)) - queryChannel.send(ConsumerMetadataRequest(group)) - val response = queryChannel.receive() - val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) - debug("Consumer metadata response: " + consumerMetadataResponse.toString) - if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) - coordinatorOpt = consumerMetadataResponse.coordinatorOpt - else { - debug("Query to %s:%d to locate offset manager for %s failed - will retry in %d milliseconds." - .format(queryChannel.host, queryChannel.port, group, retryBackOffMs)) - Thread.sleep(retryBackOffMs) - } - } - catch { - case ioe: IOException => - info("Failed to fetch consumer metadata from %s:%d.".format(queryChannel.host, queryChannel.port)) - queryChannel.disconnect() - } - } - - val coordinator = coordinatorOpt.get - if (coordinator.host == queryChannel.host && coordinator.port == queryChannel.port) { - offsetManagerChannelOpt = Some(queryChannel) - } else { - val connectString = "%s:%d".format(coordinator.host, coordinator.port) - var offsetManagerChannel: BlockingChannel = null - try { - debug("Connecting to offset manager %s.".format(connectString)) - offsetManagerChannel = new BlockingChannel(coordinator.host, coordinator.port, - BlockingChannel.UseDefaultBufferSize, - BlockingChannel.UseDefaultBufferSize, - socketTimeoutMs) - offsetManagerChannel.connect() - offsetManagerChannelOpt = Some(offsetManagerChannel) - queryChannel.disconnect() - } - catch { - case ioe: IOException => // offsets manager may have moved - info("Error while connecting to %s.".format(connectString)) - if (offsetManagerChannel != null) offsetManagerChannel.disconnect() - Thread.sleep(retryBackOffMs) - offsetManagerChannelOpt = None // just in case someone decides to change shutdownChannel to not swallow exceptions - } - } - } - - offsetManagerChannelOpt.get + val brokers = ZkUtils.getAllBrokersInCluster(zkClient) + val config = OffsetClientConfig(socketTimeoutMs, retryBackOffMs) + OffsetClient.getOffsetManagerChannel(config, brokers, group).get } } diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index d349a30..454d82e 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -127,23 +127,6 @@ class SimpleConsumer(val host: String, */ def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer) - /** - * Commit offsets for a topic - * @param request a [[kafka.api.OffsetCommitRequest]] object. - * @return a [[kafka.api.OffsetCommitResponse]] object. - */ - def commitOffsets(request: OffsetCommitRequest) = { - // TODO: With KAFKA-1012, we have to first issue a ConsumerMetadataRequest and connect to the coordinator before - // we can commit offsets. - OffsetCommitResponse.readFrom(sendRequest(request).buffer) - } - - /** - * Fetch offsets for a topic - * @param request a [[kafka.api.OffsetFetchRequest]] object. - * @return a [[kafka.api.OffsetFetchResponse]] object. - */ - def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).buffer) private def getOrMakeConnection() { if(!isClosed && !blockingChannel.isConnected) { diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index fbc680f..ec096bc 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -31,6 +31,7 @@ import kafka.common._ import kafka.metrics._ import kafka.network.BlockingChannel import kafka.serializer._ +import kafka.tools.{OffsetClient, OffsetClientConfig} import kafka.utils.Utils.inLock import kafka.utils.ZkUtils._ import kafka.utils._ @@ -98,7 +99,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null private var loadBalancerListener: ZKRebalancerListener = null - private var offsetsChannel: BlockingChannel = null private val offsetsChannelLock = new Object private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null @@ -125,7 +125,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, connectZk() createFetcher() - ensureOffsetManagerConnected() if (config.autoCommitEnable) { scheduler.startup @@ -170,16 +169,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) } - // Blocks until the offset manager is located and a channel is established to it. - private def ensureOffsetManagerConnected() { - if (config.offsetsStorage == "kafka") { - if (offsetsChannel == null || !offsetsChannel.isConnected) - offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkClient, config.offsetsChannelSocketTimeoutMs, config.offsetsChannelBackoffMs) - - debug("Connected to offset manager %s:%d.".format(offsetsChannel.host, offsetsChannel.port)) - } - } - def shutdown() { val canShutdown = isShuttingDown.compareAndSet(false, true) if (canShutdown) { @@ -204,7 +193,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, zkClient = null } - if (offsetsChannel != null) offsetsChannel.disconnect() } catch { case e: Throwable => fatal("error during consumer connector shutdown", e) @@ -286,90 +274,57 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } def commitOffsets(isAutoCommit: Boolean = true) { - var retriesRemaining = 1 + (if (isAutoCommit) config.offsetsCommitMaxRetries else 0) // no retries for commits from auto-commit + val clientConfig = OffsetClientConfig(config.offsetsChannelSocketTimeoutMs, config.offsetsChannelBackoffMs, config.offsetsCommitMaxRetries, config.clientId) + val brokers = ZkUtils.getAllBrokersInCluster(zkClient) + val offsetsToCommit = immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => + partitionTopicInfos.map { case (partition, info) => + TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset()) + } + }.toSeq:_*) + val group = config.groupId var done = false - - while (!done) { - val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors - val offsetsToCommit = immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => - partitionTopicInfos.map { case (partition, info) => - TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset()) + var retriesRemaining = 1 + (if (isAutoCommit) config.offsetsCommitMaxRetries else 0) + + if (offsetsToCommit.size <= 0){ + debug("No updates to offsets since last commit.") + } + else + { + while(!done) + { + var committed = false + if (config.offsetsStorage == "zookeeper") { + offsetsToCommit.foreach { case(topicAndPartition, offsetAndMetadata) => + commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset) } - }.toSeq:_*) - - if (offsetsToCommit.size > 0) { - if (config.offsetsStorage == "zookeeper") { - offsetsToCommit.foreach { case(topicAndPartition, offsetAndMetadata) => - commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset) - } - true - } else { + committed = true + } + else + { + try { val offsetCommitRequest = OffsetCommitRequest(config.groupId, offsetsToCommit, clientId = config.clientId) - ensureOffsetManagerConnected() - try { - kafkaCommitMeter.mark(offsetsToCommit.size) - offsetsChannel.send(offsetCommitRequest) - val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().buffer) - trace("Offset commit response: %s.".format(offsetCommitResponse)) - - val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { - offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case(folded, (topicPartition, errorCode)) => - - if (errorCode == ErrorMapping.NoError && config.dualCommitEnabled) { - val offset = offsetsToCommit(topicPartition).offset - commitOffsetToZooKeeper(topicPartition, offset) - } - - (folded._1 || // update commitFailed - errorCode != ErrorMapping.NoError, - - folded._2 || // update retryableIfFailed - (only metadata too large is not retryable) - (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode), - - folded._3 || // update shouldRefreshCoordinator - errorCode == ErrorMapping.NotCoordinatorForConsumerCode || - errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode, - - // update error count - folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0)) - } - } - debug(errorCount + " errors in offset commit response.") - - - if (shouldRefreshCoordinator) { - debug("Could not commit offsets (because offset coordinator has moved or is unavailable).") - offsetsChannel.disconnect() - } + kafkaCommitMeter.mark(offsetsToCommit.size) + val offsetCommitResponse = OffsetClient.commitOffsets(clientConfig, brokers, group, offsetCommitRequest) + committed = true - if (commitFailed && retryableIfFailed) - false - else - true - } - catch { - case t: Throwable => - error("Error while committing offsets.", t) - offsetsChannel.disconnect() - false - } } - } else { - debug("No updates to offsets since last commit.") - true + catch{ + case e: Exception => + committed = false + } } - } - done = if (isShuttingDown.get() && isAutoCommit) { // should not retry indefinitely if shutting down - retriesRemaining -= 1 - retriesRemaining == 0 || committed - } else - true + done = if (isShuttingDown.get() && isAutoCommit) { // should not retry indefinitely if shutting down + retriesRemaining -= 1 + retriesRemaining == 0 || committed + } else + true - if (!done) { - debug("Retrying offset commit in %d ms".format(config.offsetsChannelBackoffMs)) - Thread.sleep(config.offsetsChannelBackoffMs) - } + if (!done) { + debug("Retrying offset commit in %d ms".format(config.offsetsChannelBackoffMs)) + Thread.sleep(config.offsetsChannelBackoffMs) + } + } } } @@ -387,62 +342,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, Some(OffsetFetchResponse(Map.empty)) else if (config.offsetsStorage == "zookeeper") { val offsets = partitions.map(fetchOffsetFromZooKeeper) - Some(OffsetFetchResponse(immutable.Map(offsets:_*))) - } else { + Some(OffsetFetchResponse(immutable.Map(offsets: _*))) + } + else + { val offsetFetchRequest = OffsetFetchRequest(groupId = config.groupId, requestInfo = partitions, clientId = config.clientId) - var offsetFetchResponseOpt: Option[OffsetFetchResponse] = None while (!isShuttingDown.get && !offsetFetchResponseOpt.isDefined) { offsetFetchResponseOpt = offsetsChannelLock synchronized { - ensureOffsetManagerConnected() - try { - offsetsChannel.send(offsetFetchRequest) - val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().buffer) - trace("Offset fetch response: %s.".format(offsetFetchResponse)) - - val (leaderChanged, loadInProgress) = - offsetFetchResponse.requestInfo.foldLeft(false, false) { case(folded, (topicPartition, offsetMetadataAndError)) => - (folded._1 || (offsetMetadataAndError.error == ErrorMapping.NotCoordinatorForConsumerCode), - folded._2 || (offsetMetadataAndError.error == ErrorMapping.OffsetsLoadInProgressCode)) - } - - if (leaderChanged) { - offsetsChannel.disconnect() - debug("Could not fetch offsets (because offset manager has moved).") - None // retry - } - else if (loadInProgress) { - debug("Could not fetch offsets (because offset cache is being loaded).") - None // retry - } - else { - if (config.dualCommitEnabled) { - // if dual-commit is enabled (i.e., if a consumer group is migrating offsets to kafka), then pick the - // maximum between offsets in zookeeper and kafka. - val kafkaOffsets = offsetFetchResponse.requestInfo - val mostRecentOffsets = kafkaOffsets.map { case (topicPartition, kafkaOffset) => - val zkOffset = fetchOffsetFromZooKeeper(topicPartition)._2.offset - val mostRecentOffset = zkOffset.max(kafkaOffset.offset) - (topicPartition, OffsetMetadataAndError(mostRecentOffset, kafkaOffset.metadata, ErrorMapping.NoError)) - } - Some(OffsetFetchResponse(mostRecentOffsets)) - } - else - Some(offsetFetchResponse) + val clientConfig = OffsetClientConfig(config.offsetsChannelSocketTimeoutMs, config.offsetsChannelBackoffMs, config.offsetsCommitMaxRetries, config.clientId) + val brokers = ZkUtils.getAllBrokersInCluster(zkClient) + val group = config.groupId + val offsetFetchResponse = OffsetClient.fetchOffsets(clientConfig, brokers, group, offsetFetchRequest) + + if (config.dualCommitEnabled) { + // if dual-commit is enabled (i.e., if a consumer group is migrating offsets to kafka), then pick the + // maximum between offsets in zookeeper and kafka. + val kafkaOffsets = offsetFetchResponse.requestInfo + val mostRecentOffsets = kafkaOffsets.map { case (topicPartition, kafkaOffset) => + val zkOffset = fetchOffsetFromZooKeeper(topicPartition)._2.offset + val mostRecentOffset = zkOffset.max(kafkaOffset.offset) + (topicPartition, OffsetMetadataAndError(mostRecentOffset, kafkaOffset.metadata, ErrorMapping.NoError)) } + Some(OffsetFetchResponse(mostRecentOffsets)) } - catch { - case e: Exception => - warn("Error while fetching offsets from %s:%d. Possible cause: %s".format(offsetsChannel.host, offsetsChannel.port, e.getMessage)) - offsetsChannel.disconnect() - None // retry + else + { + Some(offsetFetchResponse) } } - - if (offsetFetchResponseOpt.isEmpty) { - debug("Retrying offset fetch in %d ms".format(config.offsetsChannelBackoffMs)) - Thread.sleep(config.offsetsChannelBackoffMs) - } } offsetFetchResponseOpt diff --git a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala index 0ab0195..cba075d 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala @@ -79,26 +79,6 @@ class SimpleConsumer(val host: String, underlying.getOffsetsBefore(request.underlying) } - /** - * Commit offsets for a topic - * @param request a [[kafka.javaapi.OffsetCommitRequest]] object. - * @return a [[kafka.javaapi.OffsetCommitResponse]] object. - */ - def commitOffsets(request: kafka.javaapi.OffsetCommitRequest): kafka.javaapi.OffsetCommitResponse = { - import kafka.javaapi.Implicits._ - underlying.commitOffsets(request.underlying) - } - - /** - * Fetch offsets for a topic - * @param request a [[kafka.javaapi.OffsetFetchRequest]] object. - * @return a [[kafka.javaapi.OffsetFetchResponse]] object. - */ - def fetchOffsets(request: kafka.javaapi.OffsetFetchRequest): kafka.javaapi.OffsetFetchResponse = { - import kafka.javaapi.Implicits._ - underlying.fetchOffsets(request.underlying) - } - def close() { underlying.close } diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index d1e7c43..5c9c65c 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -158,7 +158,10 @@ object ConsumerOffsetChecker extends Logging { topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*) val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq - val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) + val config = OffsetClientConfig(channelSocketTimeoutMs, channelRetryBackoffMs) + val brokers = ZkUtils.getAllBrokersInCluster(zkClient) + val channel = OffsetClient.getOffsetManagerChannel(config, brokers, group).get + // val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) channel.send(OffsetFetchRequest(group, topicPartitions)) diff --git a/core/src/main/scala/kafka/tools/ExportOffsets.scala b/core/src/main/scala/kafka/tools/ExportOffsets.scala new file mode 100644 index 0000000..d32b2fe --- /dev/null +++ b/core/src/main/scala/kafka/tools/ExportOffsets.scala @@ -0,0 +1,227 @@ +package kafka.tools + +import java.io.{BufferedWriter, File, FileWriter} +import joptsimple._ +import kafka.api.{OffsetFetchRequest, TopicMetadataResponse, OffsetFetchResponse} +import kafka.cluster.Broker +import kafka.common.TopicAndPartition +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient + + +/** + * A utility that retrieve the offset of broker partitions in ZK or OffsetManager and + * prints to an output file in the following format: + * + * ZK format + * /consumers/group1/offsets/topic1/1-0:286894308 + * /consumers/group1/offsets/topic1/2-0:284803985 + * + * OffsetManager file format + * topic/partition/offset + * + * This utility expects following arguments: + * For using Zookeeper : + * 1. Zk host:port string + * 2. group name (all groups implied if omitted) + * 3. output filename + * + * For using OffsetManager + * 1. group name + * 2. output filename + * 3. broker list (host:port) + * 4. config file (optional) + * 5. topic list (optional) + * + * To print debug message, add the following line to log4j.properties: + * log4j.logger.kafka.tools.ExportZkOffsets$=DEBUG + * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) + */ +object ExportOffsets extends Logging { + + def main(args: Array[String]) { + val parser = new OptionParser + + val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") + .withRequiredArg() + .defaultsTo("localhost:2181") + .ofType(classOf[String]) + + val groupOpt = parser.accepts("group", "Consumer group.") + .withRequiredArg() + .ofType(classOf[String]) + + val outFileOpt = parser.accepts("output-file", "Output file") + .withRequiredArg() + .ofType(classOf[String]) + + val brokerList = parser.accepts("broker-list", "List of brokers") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsChannelSocketTimeoutMs = parser.accepts("OffsetSocketChannelTimeout", "Socket Time out for the Offset channel") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsChannelBackoffMs = parser.accepts("OffsetChannelBackOffMs", "Time to wait before retrying to connect") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsCommitMaxRetries = parser.accepts("OffsetCommitMaxRetries", "Max number of retries for commiting offsets") + .withRequiredArg() + .ofType(classOf[String]) + + val topicList = parser.accepts("topics", "Topics for which offsets/Partitions should be fetched") + .withRequiredArg() + .ofType(classOf[String]) + + parser.accepts("help", "Print this message.") + + if (args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Export consumer offsets to an output file.") + + val options = parser.parse(args: _*) + + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } + + val allBrokers = options.valueOf(brokerList) + var brokers = Utils.parseBrokerList(allBrokers) + + var topics = collection.Seq[String]() + var topicListValue = options.valueOf(topicList) + if (topicListValue != null) { + topics = topicListValue.split(",").toSeq + } + + var offsetClientConfig: OffsetClientConfig = null + var value : String = null + val socketTimeOut = { + value = options.valueOf(offsetsChannelSocketTimeoutMs) + if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelSocketTimeoutMs + } + + val channelBackOff = { + value = options.valueOf(offsetsChannelBackoffMs) + if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelBackoffMs + } + + val commitRetries = { + value = options.valueOf(offsetsCommitMaxRetries) + if (value != null) value.toInt else OffsetClientConfig.OffsetsCommitMaxRetries + } + + val config = OffsetClientConfig(socketTimeOut, channelBackOff, commitRetries) + + val outfile = options.valueOf(outFileOpt) + + val groupId = options.valueOf(groupOpt) + + // checking whether to use zookeeper or offsetManager + if (allBrokers != null) { + val file = new File(outfile) + var offSetFetchResponseOpt: Option[OffsetFetchResponse] = None + var topicAndPartition = collection.Seq[TopicAndPartition]() + + var offsetsChannelBackoffMs = 0 + if (config != null) { + offsetsChannelBackoffMs = config.offsetsChannelBackoffMs + } + else { + offsetsChannelBackoffMs = OffsetClientConfig.OffsetsChannelBackoffMs + } + + if (topics.size > 0) { + val topicMetaDataResponse: TopicMetadataResponse = OffsetClient.fetchTopicMetadata(config, brokers, topics) + topicAndPartition = topicMetaDataResponse.topicsMetadata.flatMap(topicMetaData => { + val topic = topicMetaData.topic + topicMetaData.partitionsMetadata.map(partitionMetadata => { + new TopicAndPartition(topic, partitionMetadata.partitionId) + } + ) + } + ) + } + + val offsetFetchRequest = OffsetFetchRequest(groupId, requestInfo = topicAndPartition, clientId = config.clientId) + + while (!offSetFetchResponseOpt.isDefined) { + val response = OffsetClient.fetchOffsets(config, brokers, groupId, offsetFetchRequest) + println(response) + offSetFetchResponseOpt = Some(response) + } + val offsetFetchResponse = offSetFetchResponseOpt.get + val topicPartitionOffsetMap = offsetFetchResponse.requestInfo + + // writing to file + val writer = new BufferedWriter(new FileWriter(file)) + topicPartitionOffsetMap.foreach(tpo => { + val topic = tpo._1.topic + val partition = tpo._1.partition + val offset = tpo._2.offset + writer.write(topic + "/" + partition + "/" + offset) + writer.newLine() + }) + writer.flush() + writer.close() + } + else { + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, outFileOpt) + + val zkConnect = options.valueOf(zkConnectOpt) + val groups = options.valuesOf(groupOpt) + + var zkClient: ZkClient = null + val fileWriter: FileWriter = new FileWriter(outfile) + + try { + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + + var consumerGroups: Seq[String] = null + + if (groups.size == 0) { + consumerGroups = ZkUtils.getChildren(zkClient, ZkUtils.ConsumersPath).toList + } + else { + import scala.collection.JavaConversions._ + consumerGroups = groups + } + + for (consumerGrp <- consumerGroups) { + val topicsList = getTopicsList(zkClient, consumerGrp) + + for (topic <- topicsList) { + val bidPidList = getBrokeridPartition(zkClient, consumerGrp, topic) + + for (bidPid <- bidPidList) { + val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp, topic) + val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid + ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1 match { + case Some(offsetVal) => + fileWriter.write(offsetPath + ":" + offsetVal + "\n") + debug(offsetPath + " => " + offsetVal) + case None => + error("Could not retrieve offset value from " + offsetPath) + } + } + } + } + } + finally { + fileWriter.flush() + fileWriter.close() + } + } + } + + private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] = { + return ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList + } + + private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] = { + return ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList + } +} + diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala deleted file mode 100644 index 4d051bc..0000000 --- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import java.io.FileWriter -import joptsimple._ -import kafka.utils.{Logging, ZkUtils, ZKStringSerializer, ZKGroupTopicDirs, CommandLineUtils} -import org.I0Itec.zkclient.ZkClient - - -/** - * A utility that retrieve the offset of broker partitions in ZK and - * prints to an output file in the following format: - * - * /consumers/group1/offsets/topic1/1-0:286894308 - * /consumers/group1/offsets/topic1/2-0:284803985 - * - * This utility expects 3 arguments: - * 1. Zk host:port string - * 2. group name (all groups implied if omitted) - * 3. output filename - * - * To print debug message, add the following line to log4j.properties: - * log4j.logger.kafka.tools.ExportZkOffsets$=DEBUG - * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) - */ -object ExportZkOffsets extends Logging { - - def main(args: Array[String]) { - val parser = new OptionParser - - val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") - .withRequiredArg() - .defaultsTo("localhost:2181") - .ofType(classOf[String]) - val groupOpt = parser.accepts("group", "Consumer group.") - .withRequiredArg() - .ofType(classOf[String]) - val outFileOpt = parser.accepts("output-file", "Output file") - .withRequiredArg() - .ofType(classOf[String]) - parser.accepts("help", "Print this message.") - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Export consumer offsets to an output file.") - - val options = parser.parse(args : _*) - - if (options.has("help")) { - parser.printHelpOn(System.out) - System.exit(0) - } - - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, outFileOpt) - - val zkConnect = options.valueOf(zkConnectOpt) - val groups = options.valuesOf(groupOpt) - val outfile = options.valueOf(outFileOpt) - - var zkClient : ZkClient = null - val fileWriter : FileWriter = new FileWriter(outfile) - - try { - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - - var consumerGroups: Seq[String] = null - - if (groups.size == 0) { - consumerGroups = ZkUtils.getChildren(zkClient, ZkUtils.ConsumersPath).toList - } - else { - import scala.collection.JavaConversions._ - consumerGroups = groups - } - - for (consumerGrp <- consumerGroups) { - val topicsList = getTopicsList(zkClient, consumerGrp) - - for (topic <- topicsList) { - val bidPidList = getBrokeridPartition(zkClient, consumerGrp, topic) - - for (bidPid <- bidPidList) { - val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic) - val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid - ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1 match { - case Some(offsetVal) => - fileWriter.write(offsetPath + ":" + offsetVal + "\n") - debug(offsetPath + " => " + offsetVal) - case None => - error("Could not retrieve offset value from " + offsetPath) - } - } - } - } - } - finally { - fileWriter.flush() - fileWriter.close() - } - } - - private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] = { - return ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList - } - - private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] = { - return ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList - } -} diff --git a/core/src/main/scala/kafka/tools/ImportOffsets.scala b/core/src/main/scala/kafka/tools/ImportOffsets.scala new file mode 100644 index 0000000..9546644 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ImportOffsets.scala @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io.BufferedReader +import java.io.FileReader +import java.util.concurrent.TimeUnit +import joptsimple._ +import kafka.api.OffsetCommitRequest +import kafka.common.{OffsetAndMetadata, TopicAndPartition} +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient + + +/** + * A utility that updates the offset of broker partitions in ZK or OffsetManager. + * + * This utility expects following arguments: + * + * For using Zookeeper : + * 1. consumer properties file + * 2. a file contains partition offsets data such as: + * (This output data file can be obtained by running kafka.tools.ExportZkOffsets) + * + * For using OffsetManager : + * 1. group name + * 2. input filename (file to read from) + * 3. broker list (host:port) + * 4. config file (optional) + * + * ZK format + * /consumers/group1/offsets/topic1/3-0:285038193 + * /consumers/group1/offsets/topic1/1-0:286894308 + * + * OffsetManager file format + * topic/partition/offset + * + * To print debug message, add the following line to log4j.properties: + * log4j.logger.kafka.tools.ImportZkOffsets$=DEBUG + * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) + */ + + import scala.io.Source + + + + object ImportOffsets extends Logging { + + def main(args: Array[String]) { + val parser = new OptionParser + + val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") + .withRequiredArg() + .defaultsTo("localhost:2181") + .ofType(classOf[String]) + + val inFileOpt = parser.accepts("input-file", "Input file") + .withRequiredArg() + .ofType(classOf[String]) + + val brokerList = parser.accepts("broker-list", "List of brokers") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsChannelSocketTimeoutMs = parser.accepts("OffsetSocketChannelTimeout", "Socket Time out for the Offset channel") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsChannelBackoffMs = parser.accepts("OffsetChannelBackOffMs", "Time to wait before retrying to connect") + .withRequiredArg() + .ofType(classOf[String]) + + val offsetsCommitMaxRetries = parser.accepts("OffsetCommitMaxRetries", "Max number of retries for commiting offsets") + .withRequiredArg() + .ofType(classOf[String]) + + val group = parser.accepts("group", "Consumer Group Id") + .withRequiredArg() + .ofType(classOf[String]) + + parser.accepts("help", "Print this message.") + + if (args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "Import offsets to zookeeper from files.") + + val options = parser.parse(args: _*) + + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } + + val allBrokers = options.valueOf(brokerList) + var brokers = Utils.parseBrokerList(allBrokers) + + var offsetClientConfig: OffsetClientConfig = null + var value : String = null + val socketTimeOut = { + value = options.valueOf(offsetsChannelSocketTimeoutMs) + if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelSocketTimeoutMs + } + + val channelBackOff = { + value = options.valueOf(offsetsChannelBackoffMs) + if (value != null) value.toInt else OffsetClientConfig.OffsetsChannelBackoffMs + } + + val commitRetries = { + value = options.valueOf(offsetsCommitMaxRetries) + if (value != null) value.toInt else OffsetClientConfig.OffsetsCommitMaxRetries + } + + val config = OffsetClientConfig(socketTimeOut, channelBackOff, commitRetries) + + val partitionOffsetFile = options.valueOf(inFileOpt) + + var groupId: String = options.valueOf(group) + + // checking whether to use zookeeper or offsetManager + if (allBrokers != null) { + val fileLines = Source.fromFile(partitionOffsetFile).getLines().toList + val offsetsToCommit = fileLines.map(line => { + val topicPartitionOffset = line.split("/") + val topicAndPartition = new TopicAndPartition(topicPartitionOffset(0), topicPartitionOffset(1).toInt) + val offsetMetadata = new OffsetAndMetadata(topicPartitionOffset(2).toInt) + println(topicAndPartition + "->" + offsetMetadata) + (topicAndPartition -> offsetMetadata) + }).toMap + + var done = false + var offsetsCommitRetries = 0 + var offsetsChannelBackoffMS = 0 + + if (config != null) { + offsetsCommitRetries = config.offsetsCommitRetries + offsetsChannelBackoffMS = config.offsetsChannelBackoffMs + } + else { + offsetsCommitRetries = OffsetClientConfig.OffsetsCommitMaxRetries + offsetsChannelBackoffMS = OffsetClientConfig.OffsetsChannelBackoffMs + } + + var retriesRemaining = 1 + offsetsCommitRetries + + if( offsetsToCommit.size > 0) + { + val offsetCommitRequest = OffsetCommitRequest(groupId, offsetsToCommit, clientId = config.clientId) + + while (done != true || retriesRemaining == 0) { + try { + OffsetClient.commitOffsets(config, brokers, groupId, offsetCommitRequest) + done = true + } + catch { + case e: Exception => + done = false + } + retriesRemaining = retriesRemaining - 1 + + if (!done) { + debug("Retrying offset commit in %d ms".format(offsetsChannelBackoffMS)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMS) + } + } + } + else { + debug("No updates to offsets since last commit.") + true + } + } + else { + CommandLineUtils.checkRequiredArgs(parser, options, inFileOpt) + + val zkConnect = options.valueOf(zkConnectOpt) + + val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + val partitionOffsets: Map[String, String] = getPartitionOffsetsFromFile(partitionOffsetFile) + + updateZkOffsets(zkClient, partitionOffsets) + } + } + + private def getPartitionOffsetsFromFile(filename: String): Map[String, String] = { + val fr = new FileReader(filename) + val br = new BufferedReader(fr) + var partOffsetsMap: Map[String, String] = Map() + + var s: String = br.readLine() + while (s != null && s.length() >= 1) { + val tokens = s.split(":") + + partOffsetsMap += tokens(0) -> tokens(1) + debug("adding node path [" + s + "]") + + s = br.readLine() + } + + return partOffsetsMap + } + + private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String, String]): Unit = { + for ((partition, offset) <- partitionOffsets) { + debug("updating [" + partition + "] with offset [" + offset + "]") + + try { + ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) + } catch { + case e: Throwable => e.printStackTrace() + } + } + } + } + + diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala deleted file mode 100644 index abe0972..0000000 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import java.io.BufferedReader -import java.io.FileReader -import joptsimple._ -import kafka.utils.{Logging, ZkUtils,ZKStringSerializer, CommandLineUtils} -import org.I0Itec.zkclient.ZkClient - - -/** - * A utility that updates the offset of broker partitions in ZK. - * - * This utility expects 2 input files as arguments: - * 1. consumer properties file - * 2. a file contains partition offsets data such as: - * (This output data file can be obtained by running kafka.tools.ExportZkOffsets) - * - * /consumers/group1/offsets/topic1/3-0:285038193 - * /consumers/group1/offsets/topic1/1-0:286894308 - * - * To print debug message, add the following line to log4j.properties: - * log4j.logger.kafka.tools.ImportZkOffsets$=DEBUG - * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) - */ -object ImportZkOffsets extends Logging { - - def main(args: Array[String]) { - val parser = new OptionParser - - val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") - .withRequiredArg() - .defaultsTo("localhost:2181") - .ofType(classOf[String]) - val inFileOpt = parser.accepts("input-file", "Input file") - .withRequiredArg() - .ofType(classOf[String]) - parser.accepts("help", "Print this message.") - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "Import offsets to zookeeper from files.") - - val options = parser.parse(args : _*) - - if (options.has("help")) { - parser.printHelpOn(System.out) - System.exit(0) - } - - CommandLineUtils.checkRequiredArgs(parser, options, inFileOpt) - - val zkConnect = options.valueOf(zkConnectOpt) - val partitionOffsetFile = options.valueOf(inFileOpt) - - val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile) - - updateZkOffsets(zkClient, partitionOffsets) - } - - private def getPartitionOffsetsFromFile(filename: String):Map[String,String] = { - val fr = new FileReader(filename) - val br = new BufferedReader(fr) - var partOffsetsMap: Map[String,String] = Map() - - var s: String = br.readLine() - while ( s != null && s.length() >= 1) { - val tokens = s.split(":") - - partOffsetsMap += tokens(0) -> tokens(1) - debug("adding node path [" + s + "]") - - s = br.readLine() - } - - return partOffsetsMap - } - - private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = { - for ((partition, offset) <- partitionOffsets) { - debug("updating [" + partition + "] with offset [" + offset + "]") - - try { - ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) - } catch { - case e: Throwable => e.printStackTrace() - } - } - } -} diff --git a/core/src/main/scala/kafka/tools/OffsetClient.scala b/core/src/main/scala/kafka/tools/OffsetClient.scala new file mode 100644 index 0000000..37372b6 --- /dev/null +++ b/core/src/main/scala/kafka/tools/OffsetClient.scala @@ -0,0 +1,287 @@ +package kafka.tools + +import java.io.IOException +import java.util.concurrent.TimeUnit + +import kafka.api._ +import kafka.common.{KafkaException, ErrorMapping, TopicAndPartition} +import kafka.network.BlockingChannel +import kafka.utils.Logging +import kafka.cluster.Broker +import scala.util.Random + +/** + * A utility that provides APIs to fetch and commit offsets + * + */ +object OffsetClient extends Logging { + + private def getQueryChannel(config: OffsetClientConfig, brokers: Seq[Broker]): BlockingChannel = { + var socketTimeoutMs = { + if (config != null) + config.offsetsChannelSocketTimeoutMs + else + OffsetClientConfig.OffsetsChannelSocketTimeoutMs + } + var channel: BlockingChannel = null + var connected = false + val shuffledBrokers = Random.shuffle(brokers) + var i: Int = 0 + + while (!connected) { + val broker = shuffledBrokers(i) + i = (i + 1) % shuffledBrokers.size + trace("Connecting to broker...") + try { + channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, socketTimeoutMs) + channel.connect() + debug("Created channel to broker [%s:%d].".format(channel.host, channel.port)) + } + catch { + case e: Exception => + if (channel != null) channel.disconnect() + channel = null + error("Error while creating channel to [%s:%d]. Retrying...".format(broker.host, broker.port)) + } + connected = channel.isConnected + } + + channel + } + + /** + * Creates a blocking channel to the offset manager of the given group + */ + def getOffsetManagerChannel(config: OffsetClientConfig, brokers: Seq[Broker], group: String): Option[BlockingChannel] = { + var (offsetsChannelBackoffMs, socketTimeoutMs) = { + if (config != null) { + (config.offsetsChannelBackoffMs, config.offsetsChannelSocketTimeoutMs) + } + else { + (OffsetClientConfig.OffsetsChannelBackoffMs, OffsetClientConfig.OffsetsChannelSocketTimeoutMs) + } + } + var offSetManagerChannel: BlockingChannel = null + var queryChannel = getQueryChannel(config, brokers) + var offSetManagerChannelOpt: Option[BlockingChannel] = None + + while (!offSetManagerChannelOpt.isDefined) { + + var offsetManagerOpt: Option[Broker] = None + + while (!offsetManagerOpt.isDefined) { + try { + if (!queryChannel.isConnected) { + queryChannel = getQueryChannel(config, brokers) + } + debug("Querying [%s:%d] to locate offset manager for group [%s].".format(queryChannel.host, queryChannel.port, group)) + queryChannel.send(ConsumerMetadataRequest(group, ConsumerMetadataRequest.CurrentVersion, 0, config.clientId)) + val response = queryChannel.receive() + val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) + debug("Consumer metadata response: " + consumerMetadataResponse)//.coordinatorOpt.get.id) + if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) { + offsetManagerOpt = consumerMetadataResponse.coordinatorOpt + } + else { + debug("Query to [%s:%d] to locate offset manager for group [%s] failed - will retry in %d milliseconds." + .format(queryChannel.host, queryChannel.port, group, offsetsChannelBackoffMs)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) + } + } + catch { + case ioe: IOException => + error("Error while connecting to [%s:%d] for fetching consumer metadata".format(queryChannel.host, queryChannel.port)) + queryChannel.disconnect() + } + } + + val offsetManager = offsetManagerOpt.get + if (offsetManager.host == queryChannel.host && offsetManager.port == queryChannel.port) { + offSetManagerChannelOpt = Some(queryChannel) + } + else { + val connectString = "[%s:%d]".format(offsetManager.host, offsetManager.port) + try { + offSetManagerChannel = new BlockingChannel(offsetManager.host, offsetManager.port, BlockingChannel.UseDefaultBufferSize, + BlockingChannel.UseDefaultBufferSize, socketTimeoutMs) + offSetManagerChannel.connect() + offSetManagerChannelOpt = Some(offSetManagerChannel) + info("Connected to offset manager [%s:%d] for group [%s].".format(offSetManagerChannel.host, offSetManagerChannel.port, group)) + queryChannel.disconnect() + } + catch { + case ioe: IOException => + error("Error while connecting to %s.".format(connectString)) + if (offSetManagerChannel != null) offSetManagerChannel.disconnect() + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) + offSetManagerChannelOpt = None + } + } + } + + offSetManagerChannelOpt + } + + def fetchTopicMetadata(config: OffsetClientConfig, brokers: Seq[Broker], topics: Seq[String]): TopicMetadataResponse = { + var topicAndPartitions = List[TopicAndPartition]() + var fetchMetadataSucceeded = false + var i: Int = 0 + var topicMetadataChannel = getQueryChannel(config, brokers) + var topicMetaDataResponse: TopicMetadataResponse = null + var t: Throwable = null + + while (i < brokers.size && !fetchMetadataSucceeded) { + try { + if (!topicMetadataChannel.isConnected) { + topicMetadataChannel = getQueryChannel(config, brokers) + } + info("Fetching metadata from broker [%s:%d]".format(topicMetadataChannel.host, topicMetadataChannel.port)) + val topicMetaDataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics) + topicMetadataChannel.send(topicMetaDataRequest) + topicMetaDataResponse = TopicMetadataResponse.readFrom(topicMetadataChannel.receive().buffer) + fetchMetadataSucceeded = true + topicMetadataChannel.disconnect() + } + catch { + case e: Exception => + warn("Fetching metadata for topics %s from broker [%s:%d] failed".format(topics, topicMetadataChannel.host, topicMetadataChannel.port)) + topicMetadataChannel.disconnect() + t = e + } + finally + { + i = i + 1 + } + + // TimeUnit.MILLISECONDS.sleep(config.offsetsChannelBackoffMs) + } + + if(!fetchMetadataSucceeded) { + throw new KafkaException("fetching topic metadata for topics %s from broker [%s] failed".format(topics, brokers), t) + } else { + debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics)) + } + + topicMetaDataResponse + } + + + def fetchOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], group: String, offsetFetchRequest: OffsetFetchRequest): OffsetFetchResponse = { + var offsetsChannelBackoffMs = 0 + if (config != null) { + offsetsChannelBackoffMs = config.offsetsChannelBackoffMs + } + else { + offsetsChannelBackoffMs = OffsetClientConfig.OffsetsChannelBackoffMs + } + var offsetFetchResponseOpt: Option[OffsetFetchResponse] = None + while (!offsetFetchResponseOpt.isDefined) { + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers, group).get + try { + offsetManagerChannel.send(offsetFetchRequest) + val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetManagerChannel.receive().buffer) + trace("Offset fetch response: %s.".format(offsetFetchResponse)) + + val (leaderChanged, loadInProgress) = + offsetFetchResponse.requestInfo.foldLeft(false, false) { case (folded, (topicPartition, offsetMetadataAndError)) => + (folded._1 || (offsetMetadataAndError.error == ErrorMapping.NotCoordinatorForConsumerCode), + folded._2 || (offsetMetadataAndError.error == ErrorMapping.OffsetsLoadInProgressCode)) + } + + if (leaderChanged) { + offsetManagerChannel.disconnect() + debug("Could not fetch offsets (because offset manager has moved).") + } + else if (loadInProgress) { + debug("Could not fetch offsets (because offset cache is being loaded).") + } + else { + offsetFetchResponseOpt = Some(offsetFetchResponse) + offsetManagerChannel.disconnect() + } + } + catch { + case e: Exception => + warn("Error while fetching offsets from [%s:%d]".format(offsetManagerChannel.host, offsetManagerChannel.port)) + } + finally + { + if(offsetManagerChannel.isConnected) + offsetManagerChannel.disconnect() + } + + if (offsetFetchResponseOpt.isEmpty) { + debug("Retrying offset fetch in %d ms".format(offsetsChannelBackoffMs)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) + } + } + + offsetFetchResponseOpt.get + } + + def commitOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], group: String, offsetCommitRequest: OffsetCommitRequest): OffsetCommitResponse = { + var committed = false + var offsetCommitResponse: OffsetCommitResponse = null + + + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers, group).get + try { + offsetManagerChannel.send(offsetCommitRequest) + offsetCommitResponse = OffsetCommitResponse.readFrom(offsetManagerChannel.receive().buffer) + trace("Offset commit response: %s.".format(offsetCommitResponse)) + + val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { + offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) => + + (folded._1 || //update commitFailed + errorCode != ErrorMapping.NoError, + + folded._2 || //update retryableIfFailed - (only metadata too large is not retryable) + (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode), + + folded._3 || //update shouldRefreshCoordinator + errorCode == ErrorMapping.NotCoordinatorForConsumerCode || + errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode, + + //update error count + folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0)) + } + } + debug(errorCount + " errors in offset commit response.") + + if (shouldRefreshCoordinator) { + debug("Could not commit offsets (because offset manager has moved or is unavailable).") + offsetManagerChannel.disconnect() + } + + if (commitFailed && retryableIfFailed) { + committed = false + } + else { + committed = true + offsetManagerChannel.disconnect() + } + } + catch { + case t: Throwable => + error("Error while committing offsets.", t) + offsetManagerChannel.disconnect() + throw new KafkaException(t) + } + finally + { + if(offsetManagerChannel.isConnected) + offsetManagerChannel.disconnect() + } + + offsetCommitResponse + } + + // def commitOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], offsetCommitRequest: kafka.javaapi.OffsetCommitRequest, group: String): Boolean = { + // return commitOffsets(config, brokers, offsetCommitRequest.underlying, group) + // } +} + + + + diff --git a/core/src/main/scala/kafka/tools/OffsetClientConfig.scala b/core/src/main/scala/kafka/tools/OffsetClientConfig.scala new file mode 100644 index 0000000..5669ec0 --- /dev/null +++ b/core/src/main/scala/kafka/tools/OffsetClientConfig.scala @@ -0,0 +1,23 @@ +package kafka.tools + +import kafka.common.Config + +object OffsetClientConfig extends Config { + val OffsetsChannelSocketTimeoutMs = 10000 + val OffsetsChannelBackoffMs = 1000 + val DefaultClientId = "" + val OffsetsCommitMaxRetries = 5 +} + +case class OffsetClientConfig(offsetsChannelSocketTimeoutMs: Int = OffsetClientConfig.OffsetsChannelSocketTimeoutMs, + offsetsChannelBackoffMs: Int = OffsetClientConfig.OffsetsChannelBackoffMs, + offsetsCommitRetries: Int = OffsetClientConfig.OffsetsCommitMaxRetries, + clientId: String = OffsetClientConfig.DefaultClientId) { + + def this(clientId: String) { + this(OffsetClientConfig.OffsetsChannelSocketTimeoutMs, OffsetClientConfig.OffsetsChannelBackoffMs, + OffsetClientConfig.OffsetsCommitMaxRetries, clientId) + } +} + + diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 29d5a17..bd60eb8 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -24,6 +24,8 @@ import java.nio.channels._ import java.util.concurrent.locks.{ReadWriteLock, Lock} import java.lang.management._ import javax.management._ +import kafka.cluster.Broker + import scala.collection._ import scala.collection.mutable import java.util.Properties @@ -566,4 +568,21 @@ object Utils extends Logging { case c => c }.mkString } + + def parseBrokerList(brokerList: String): Seq[Broker] = + { + var brokers = mutable.Seq[Broker]() + if (brokerList != null) { + val hostAndPort = brokerList.split(",") + brokers = hostAndPort.map(hostPort => { + val temp = hostPort.split(":") + val host = temp(0) + val port = temp(1).toInt + new Broker(Math.random().toInt, host, port) + }) + } + + brokers + } + } diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index 41f334d..6bd98e6 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -1,8 +1,9 @@ package other.kafka +import kafka.tools.{OffsetClient, OffsetClientConfig} import org.I0Itec.zkclient.ZkClient import kafka.api._ -import kafka.utils.{ShutdownableThread, ZKStringSerializer} +import kafka.utils.{ZkUtils, ShutdownableThread, ZKStringSerializer} import scala.collection._ import kafka.client.ClientUtils import joptsimple.OptionParser @@ -21,6 +22,7 @@ object TestOffsetManager { val random = new Random val SocketTimeoutMs = 10000 + val clientConfig = OffsetClientConfig(SocketTimeoutMs) class StatsThread(reportingIntervalMs: Long, commitThreads: Seq[CommitThread], fetchThread: FetchThread) extends ShutdownableThread("stats-thread") { @@ -52,7 +54,9 @@ object TestOffsetManager { private val group = "group-" + id private val metadata = "Metadata from commit thread " + id - private var offsetsChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs) + private val brokers = ZkUtils.getAllBrokersInCluster(zkClient) + private var offsetsChannel = OffsetClient.getOffsetManagerChannel(clientConfig, brokers, group).get + //private var offsetsChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs) private var offset = 0L val numErrors = new AtomicInteger(0) val numCommits = new AtomicInteger(0) @@ -62,7 +66,7 @@ object TestOffsetManager { private def ensureConnected() { if (!offsetsChannel.isConnected) - offsetsChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs) + offsetsChannel = OffsetClient.getOffsetManagerChannel(clientConfig, brokers, group).get } override def doWork() { @@ -124,7 +128,8 @@ object TestOffsetManager { val channel = if (channels.contains(coordinatorId)) channels(coordinatorId) else { - val newChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs) + val brokers = ZkUtils.getAllBrokersInCluster(zkClient) + val newChannel = OffsetClient.getOffsetManagerChannel(clientConfig, brokers, group).get channels.put(coordinatorId, newChannel) newChannel } diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 2d93250..f68c708 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -18,6 +18,8 @@ package kafka.server import java.io.File +import kafka.cluster.Broker +import kafka.tools.{OffsetClientConfig, OffsetClient} import kafka.utils._ import junit.framework.Assert._ import java.util.Properties @@ -39,8 +41,8 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { var logSize: Int = 100 val brokerPort: Int = 9099 val group = "test-group" - var simpleConsumer: SimpleConsumer = null var time: Time = new MockTime() + var brokerList = Seq[Broker]() @Before override def setUp() { @@ -49,21 +51,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val logDirPath = config.getProperty("log.dir") logDir = new File(logDirPath) time = new MockTime() - server = TestUtils.createServer(new KafkaConfig(config), time) - simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "test-client") - val consumerMetadataRequest = ConsumerMetadataRequest(group) - Stream.continually { - val consumerMetadataResponse = simpleConsumer.send(consumerMetadataRequest) - consumerMetadataResponse.coordinatorOpt.isDefined - }.dropWhile(success => { - if (!success) Thread.sleep(1000) - !success - }) + val kafkaConfig = new KafkaConfig(config) + server = TestUtils.createServer(kafkaConfig, time) + brokerList = brokerList :+ new Broker(1,kafkaConfig.hostName , kafkaConfig.port) } @After override def tearDown() { - simpleConsumer.close server.shutdown Utils.rm(logDir) super.tearDown() @@ -80,13 +74,14 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = Seq(server)) val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L))) - val commitResponse = simpleConsumer.commitOffsets(commitRequest) + val clientConfig = OffsetClientConfig() + val commitResponse = OffsetClient.commitOffsets(clientConfig, brokerList, group, commitRequest) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get) // Fetch it and verify val fetchRequest = OffsetFetchRequest(group, Seq(topicAndPartition)) - val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest) + val fetchResponse = OffsetClient.fetchOffsets(clientConfig, brokerList, group, fetchRequest) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(topicAndPartition).get.error) assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(topicAndPartition).get.metadata) @@ -97,13 +92,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { offset=100L, metadata="some metadata" ))) - val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1) + val commitResponse1 = OffsetClient.commitOffsets(clientConfig, brokerList, group, commitRequest1) assertEquals(ErrorMapping.NoError, commitResponse1.commitStatus.get(topicAndPartition).get) // Fetch it and verify val fetchRequest1 = OffsetFetchRequest(group, Seq(topicAndPartition)) - val fetchResponse1 = simpleConsumer.fetchOffsets(fetchRequest1) + val fetchResponse1 = OffsetClient.fetchOffsets(clientConfig, brokerList, group, fetchRequest1) assertEquals(ErrorMapping.NoError, fetchResponse1.requestInfo.get(topicAndPartition).get.error) assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata) @@ -118,13 +113,15 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val topic3 = "topic-3" val topic4 = "topic-4" + val clientConfig = OffsetClientConfig() + val commitRequest = OffsetCommitRequest("test-group", immutable.Map( TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="metadata one"), TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=43L, metadata="metadata two"), TopicAndPartition(topic3, 0) -> OffsetAndMetadata(offset=44L, metadata="metadata three"), TopicAndPartition(topic2, 1) -> OffsetAndMetadata(offset=45L) )) - val commitResponse = simpleConsumer.commitOffsets(commitRequest) + val commitResponse = OffsetClient.commitOffsets(clientConfig, brokerList, group, commitRequest) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic3, 0)).get) @@ -138,7 +135,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { TopicAndPartition(topic3, 1), // An unknown partition TopicAndPartition(topic4, 0) // An unknown topic )) - val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest) + val fetchResponse = OffsetClient.fetchOffsets(clientConfig, brokerList, group, fetchRequest) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.error) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.error) @@ -168,12 +165,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(1)) createTopic(zkClient, topicAndPartition.topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = Seq(server)) + val clientConfig = OffsetClientConfig() val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata( offset=42L, metadata=random.nextString(server.config.offsetMetadataMaxSize) ))) - val commitResponse = simpleConsumer.commitOffsets(commitRequest) + val commitResponse = OffsetClient.commitOffsets(clientConfig, brokerList, group, commitRequest) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get) @@ -181,7 +179,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { offset=42L, metadata=random.nextString(server.config.offsetMetadataMaxSize + 1) ))) - val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1) + val commitResponse1 = OffsetClient.commitOffsets(clientConfig, brokerList, group, commitRequest1) assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get) -- 1.9.3 (Apple Git-50)