From 025f36f6f0db94aa4b126bb427dae08f841187a8 Mon Sep 17 00:00:00 2001 From: mgharat Date: Sun, 7 Dec 2014 11:16:10 -0800 Subject: [PATCH 1/3] OffsetClient --- core/src/main/scala/kafka/tools/OffsetClient.scala | 283 +++++++++++++++++++++ 1 file changed, 283 insertions(+) create mode 100644 core/src/main/scala/kafka/tools/OffsetClient.scala diff --git a/core/src/main/scala/kafka/tools/OffsetClient.scala b/core/src/main/scala/kafka/tools/OffsetClient.scala new file mode 100644 index 0000000..17e5828 --- /dev/null +++ b/core/src/main/scala/kafka/tools/OffsetClient.scala @@ -0,0 +1,283 @@ +package kafka.tools + +import java.io.IOException +import java.util.concurrent.TimeUnit + +import kafka.api._ +import kafka.common.{KafkaException, ErrorMapping, TopicAndPartition} +import kafka.network.BlockingChannel +import kafka.utils.Logging +import kafka.cluster.Broker +import scala.util.Random + +/** + * A utility that provides APIs to fetch and commit offsets + * + */ +object OffsetClient extends Logging { + + private def getQueryChannel(config: OffsetClientConfig, brokers: Seq[Broker]): BlockingChannel = { + var socketTimeoutMs = { + if (config != null) + config.offsetsChannelSocketTimeoutMs + else + OffsetClientConfig.OffsetsChannelSocketTimeoutMs + } + var channel: BlockingChannel = null + var connected = false + val shuffledBrokers = Random.shuffle(brokers) + var i: Int = 0 + + while (!connected) { + val broker = shuffledBrokers(i) + i = (i + 1) % shuffledBrokers.size + trace("Connecting to broker...") + try { + channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, socketTimeoutMs) + channel.connect() + debug("Created channel to broker [%s:%d].".format(channel.host, channel.port)) + } + catch { + case e: Exception => + if (channel != null) channel.disconnect() + channel = null + error("Error while creating channel to [%s:%d]. Retrying...".format(broker.host, broker.port)) + } + connected = channel.isConnected + } + + channel + } + + /** + * Creates a blocking channel to the offset manager of the given group + */ + def getOffsetManagerChannel(config: OffsetClientConfig, brokers: Seq[Broker], group: String): Option[BlockingChannel] = { + var (offsetsChannelBackoffMs, socketTimeoutMs) = { + if (config != null) { + (config.offsetsChannelBackoffMs, config.offsetsChannelSocketTimeoutMs) + } + else { + (OffsetClientConfig.OffsetsChannelBackoffMs, OffsetClientConfig.OffsetsChannelSocketTimeoutMs) + } + } + var offSetManagerChannel: BlockingChannel = null + var queryChannel = getQueryChannel(config, brokers) + var offSetManagerChannelOpt: Option[BlockingChannel] = None + + while (!offSetManagerChannelOpt.isDefined) { + + var offsetManagerOpt: Option[Broker] = None + + while (!offsetManagerOpt.isDefined) { + try { + if (!queryChannel.isConnected) { + queryChannel = getQueryChannel(config, brokers) + } + debug("Querying [%s:%d] to locate offset manager for group [%s].".format(queryChannel.host, queryChannel.port, group)) + queryChannel.send(ConsumerMetadataRequest(group, ConsumerMetadataRequest.CurrentVersion, 0, config.clientId)) + val response = queryChannel.receive() + val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) + debug("Consumer metadata response: " + consumerMetadataResponse)//.coordinatorOpt.get.id) + if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) { + offsetManagerOpt = consumerMetadataResponse.coordinatorOpt + } + else { + debug("Query to [%s:%d] to locate offset manager for group [%s] failed - will retry in %d milliseconds." + .format(queryChannel.host, queryChannel.port, group, offsetsChannelBackoffMs)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) + } + } + catch { + case ioe: IOException => + error("Error while connecting to [%s:%d] for fetching consumer metadata".format(queryChannel.host, queryChannel.port)) + queryChannel.disconnect() + } + } + + val offsetManager = offsetManagerOpt.get + if (offsetManager.host == queryChannel.host && offsetManager.port == queryChannel.port) { + offSetManagerChannelOpt = Some(queryChannel) + } + else { + val connectString = "[%s:%d]".format(offsetManager.host, offsetManager.port) + try { + offSetManagerChannel = new BlockingChannel(offsetManager.host, offsetManager.port, BlockingChannel.UseDefaultBufferSize, + BlockingChannel.UseDefaultBufferSize, socketTimeoutMs) + offSetManagerChannel.connect() + offSetManagerChannelOpt = Some(offSetManagerChannel) + info("Connected to offset manager [%s:%d] for group [%s].".format(offSetManagerChannel.host, offSetManagerChannel.port, group)) + queryChannel.disconnect() + } + catch { + case ioe: IOException => + error("Error while connecting to %s.".format(connectString)) + if (offSetManagerChannel != null) offSetManagerChannel.disconnect() + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) + offSetManagerChannelOpt = None + } + } + } + + offSetManagerChannelOpt + } + + def fetchTopicMetadata(config: OffsetClientConfig, brokers: Seq[Broker], topics: Seq[String]): TopicMetadataResponse = { + var topicAndPartitions = List[TopicAndPartition]() + var fetchMetadataSucceeded = false + var i: Int = 0 + var topicMetadataChannel = getQueryChannel(config, brokers) + var topicMetaDataResponse: TopicMetadataResponse = null + var t: Throwable = null + + while (i < brokers.size && !fetchMetadataSucceeded) { + try { + if (!topicMetadataChannel.isConnected) { + topicMetadataChannel = getQueryChannel(config, brokers) + } + info("Fetching metadata from broker [%s:%d]".format(topicMetadataChannel.host, topicMetadataChannel.port)) + val topicMetaDataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics) + topicMetadataChannel.send(topicMetaDataRequest) + topicMetaDataResponse = TopicMetadataResponse.readFrom(topicMetadataChannel.receive().buffer) + fetchMetadataSucceeded = true + topicMetadataChannel.disconnect() + } + catch { + case e: Exception => + warn("Fetching metadata for topics %s from broker [%s:%d] failed".format(topics, topicMetadataChannel.host, topicMetadataChannel.port)) + topicMetadataChannel.disconnect() + t = e + } + finally + { + i = i + 1 + } + + TimeUnit.MILLISECONDS.sleep(config.offsetsChannelBackoffMs) + } + + if(!fetchMetadataSucceeded) { + throw new KafkaException("fetching topic metadata for topics %s from broker [%s] failed".format(topics, brokers), t) + } else { + debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics)) + } + + topicMetaDataResponse + } + + + def fetchOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], group: String, offsetFetchRequest: OffsetFetchRequest): OffsetFetchResponse = { + var offsetsChannelBackoffMs = 0 + if (config != null) { + offsetsChannelBackoffMs = config.offsetsChannelBackoffMs + } + else { + offsetsChannelBackoffMs = OffsetClientConfig.OffsetsChannelBackoffMs + } + var offsetFetchResponseOpt: Option[OffsetFetchResponse] = None + while (!offsetFetchResponseOpt.isDefined) { + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers, group).get + try { + offsetManagerChannel.send(offsetFetchRequest) + val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetManagerChannel.receive().buffer) + trace("Offset fetch response: %s.".format(offsetFetchResponse)) + + val (leaderChanged, loadInProgress) = + offsetFetchResponse.requestInfo.foldLeft(false, false) { case (folded, (topicPartition, offsetMetadataAndError)) => + (folded._1 || (offsetMetadataAndError.error == ErrorMapping.NotCoordinatorForConsumerCode), + folded._2 || (offsetMetadataAndError.error == ErrorMapping.OffsetsLoadInProgressCode)) + } + + if (leaderChanged) { + offsetManagerChannel.disconnect() + debug("Could not fetch offsets (because offset manager has moved).") + } + else if (loadInProgress) { + debug("Could not fetch offsets (because offset cache is being loaded).") + } + else { + offsetFetchResponseOpt = Some(offsetFetchResponse) + offsetManagerChannel.disconnect() + } + } + catch { + case e: Exception => + warn("Error while fetching offsets from [%s:%d]".format(offsetManagerChannel.host, offsetManagerChannel.port)) + } + finally + { + if(offsetManagerChannel.isConnected) + offsetManagerChannel.disconnect() + } + + if (offsetFetchResponseOpt.isEmpty) { + debug("Retrying offset fetch in %d ms".format(offsetsChannelBackoffMs)) + TimeUnit.MILLISECONDS.sleep(offsetsChannelBackoffMs) + } + } + + offsetFetchResponseOpt.get + } + + def commitOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], group: String, offsetCommitRequest: OffsetCommitRequest): OffsetCommitResponse = { + var committed = false + var offsetCommitResponse: OffsetCommitResponse = null + + + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers, group).get + try { + offsetManagerChannel.send(offsetCommitRequest) + offsetCommitResponse = OffsetCommitResponse.readFrom(offsetManagerChannel.receive().buffer) + trace("Offset commit response: %s.".format(offsetCommitResponse)) + + val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { + offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) => + + (folded._1 || //update commitFailed + errorCode != ErrorMapping.NoError, + + folded._2 || //update retryableIfFailed - (only metadata too large is not retryable) + (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode), + + folded._3 || //update shouldRefreshCoordinator + errorCode == ErrorMapping.NotCoordinatorForConsumerCode || + errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode, + + //update error count + folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0)) + } + } + debug(errorCount + " errors in offset commit response.") + + if (shouldRefreshCoordinator) { + debug("Could not commit offsets (because offset manager has moved or is unavailable).") + offsetManagerChannel.disconnect() + } + + if (commitFailed && retryableIfFailed) { + committed = false + } + else { + committed = true + offsetManagerChannel.disconnect() + } + } + catch { + case t: Throwable => + error("Error while committing offsets.", t) + offsetManagerChannel.disconnect() + throw new KafkaException(t) + } + finally + { + if(offsetManagerChannel.isConnected) + offsetManagerChannel.disconnect() + } + + offsetCommitResponse + } + + // def commitOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], offsetCommitRequest: kafka.javaapi.OffsetCommitRequest, group: String): Boolean = { + // return commitOffsets(config, brokers, offsetCommitRequest.underlying, group) + // } +} -- 1.9.3 (Apple Git-50) From 8e6cb5b1bfd43a367cbe9c25c6896afed8125b5e Mon Sep 17 00:00:00 2001 From: mgharat Date: Sun, 7 Dec 2014 11:28:01 -0800 Subject: [PATCH 2/3] OffsetClient config for the OffsetClient --- .../scala/kafka/tools/OffsetClientConfig.scala | 39 ++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 core/src/main/scala/kafka/tools/OffsetClientConfig.scala diff --git a/core/src/main/scala/kafka/tools/OffsetClientConfig.scala b/core/src/main/scala/kafka/tools/OffsetClientConfig.scala new file mode 100644 index 0000000..b0df623 --- /dev/null +++ b/core/src/main/scala/kafka/tools/OffsetClientConfig.scala @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import kafka.common.Config + +object OffsetClientConfig extends Config { + val OffsetsChannelSocketTimeoutMs = 100000 + val OffsetsChannelBackoffMs = 1000 + val DefaultClientId = "" + val OffsetsCommitMaxRetries = 5 +} + +case class OffsetClientConfig(offsetsChannelSocketTimeoutMs: Int = OffsetClientConfig.OffsetsChannelSocketTimeoutMs, + offsetsChannelBackoffMs: Int = OffsetClientConfig.OffsetsChannelBackoffMs, + offsetsCommitRetries: Int = OffsetClientConfig.OffsetsCommitMaxRetries, + clientId: String = OffsetClientConfig.DefaultClientId) { + + def this(clientId: String) { + this(OffsetClientConfig.OffsetsChannelSocketTimeoutMs, OffsetClientConfig.OffsetsChannelBackoffMs, + OffsetClientConfig.OffsetsCommitMaxRetries, clientId) + } +} + -- 1.9.3 (Apple Git-50) From c3f7d6b516ce7c4221917395817ba7a5ff1a6581 Mon Sep 17 00:00:00 2001 From: mgharat Date: Sun, 7 Dec 2014 11:42:41 -0800 Subject: [PATCH 3/3] OffsetClient for fetching and importing offsets from kafka --- core/src/main/scala/kafka/tools/OffsetClient.scala | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/tools/OffsetClient.scala b/core/src/main/scala/kafka/tools/OffsetClient.scala index 17e5828..dbbfd4a 100644 --- a/core/src/main/scala/kafka/tools/OffsetClient.scala +++ b/core/src/main/scala/kafka/tools/OffsetClient.scala @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package kafka.tools import java.io.IOException @@ -276,8 +293,4 @@ object OffsetClient extends Logging { offsetCommitResponse } - - // def commitOffsets(config: OffsetClientConfig = null, brokers: Seq[Broker], offsetCommitRequest: kafka.javaapi.OffsetCommitRequest, group: String): Boolean = { - // return commitOffsets(config, brokers, offsetCommitRequest.underlying, group) - // } } -- 1.9.3 (Apple Git-50)