From 4c42f9baabf54284cc084213de6dc79b03bc8b3d Mon Sep 17 00:00:00 2001 From: mgharat Date: Mon, 15 Sep 2014 15:12:41 -0700 Subject: [PATCH] SRE Offset tool + Offset Client --- config/consumer.properties | 3 + core/src/main/scala/kafka/tools/OffsetClient.scala | 270 +++++++++++++++++++++ .../src/main/scala/kafka/tools/SreOffsetTool.scala | 128 ++++++++++ 3 files changed, 401 insertions(+) create mode 100644 core/src/main/scala/kafka/tools/OffsetClient.scala create mode 100644 core/src/main/scala/kafka/tools/SreOffsetTool.scala diff --git a/config/consumer.properties b/config/consumer.properties index 83847de..2fc7fb7 100644 --- a/config/consumer.properties +++ b/config/consumer.properties @@ -27,3 +27,6 @@ group.id=test-consumer-group #consumer timeout #consumer.timeout.ms=5000 + +#offsetStorage +offsets.storage=kafka diff --git a/core/src/main/scala/kafka/tools/OffsetClient.scala b/core/src/main/scala/kafka/tools/OffsetClient.scala new file mode 100644 index 0000000..c62a31c --- /dev/null +++ b/core/src/main/scala/kafka/tools/OffsetClient.scala @@ -0,0 +1,270 @@ +package kafka.tools + +import java.io.IOException +import java.util.concurrent.TimeUnit + +import kafka.api._ +import kafka.common.{OffsetAndMetadata, TopicAndPartition, ErrorMapping} +import kafka.consumer.ConsumerConfig +import kafka.network.BlockingChannel +import kafka.utils.Logging +import org.I0Itec.zkclient.ZkClient +import kafka.cluster.{Broker, Cluster} + +import scala.util.Random + + +/** + * Created by mgharat on 9/11/14. + */ +object OffsetClient extends Logging { + + private def getQueryChannel(config: ConsumerConfig, brokers: Seq[Broker]): BlockingChannel = { + var channel: BlockingChannel = null + var connected = false + // val shuffledBrokers = Random.shuffle(brokers) + var i: Int = 0 + + while ( /*i < shuffledBrokers.size &&*/ !connected) { + // val broker = shuffledBrokers(i) + // i = i + 1 + val shuffledBrokers = Random.shuffle(brokers) + val broker = shuffledBrokers(0) + trace("Connecting to the broker %s:%d.".format(broker.host, broker.port)) + try { + channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, config.socketTimeoutMs) + debug("Created channel to broker %s:%d.".format(channel.host, channel.port)) + true + } + catch { + case e: Exception => + if (channel != null) channel.disconnect() + info("Error while creating channel to %s:%d.".format(broker.host, broker.port)) + false + } + connected = if (channel == null) false else true + } + channel.connect() + + channel + + } + + private def getOffsetManagerChannel(config: ConsumerConfig, /*brokerMap: collection.Map[Int, Broker]*/ brokers: Seq[Broker]): Option[BlockingChannel] = { + var offSetManagerChannel: BlockingChannel = null + + var queryChannel = getQueryChannel(config, /*brokerMap.values.toSeq*/ brokers) + + var offSetManagerChannelOpt: Option[BlockingChannel] = None + + while (!offSetManagerChannelOpt.isDefined) { + + var coordinatorOpt: Option[Broker] = None + + while (!coordinatorOpt.isDefined) { + try { + if (!queryChannel.isConnected) { + queryChannel = getQueryChannel(config, /*brokerMap.values.toSeq*/ brokers) + } + debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, config.groupId)) + queryChannel.send(ConsumerMetadataRequest(config.groupId)) + val response = queryChannel.receive() + val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) + debug("Consumer metadata response: " + consumerMetadataResponse.toString) + if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) { + coordinatorOpt = consumerMetadataResponse.coordinatorOpt + } + else { + debug("Query to %s:%d to locate offset manager for %s failed - will retry in % milliseconds." + .format(queryChannel.host, queryChannel.port, config.groupId, config.offsetsChannelBackoffMs)) + Thread.sleep(config.offsetsChannelBackoffMs) + } + } + catch { + case ioe: IOException => + info("Failed to fetch consumer metadata from %s:%d.".format(queryChannel.host, queryChannel.port)) + queryChannel.disconnect() + } + } + + val coordinator = coordinatorOpt.get + if (coordinator.host == queryChannel.host && coordinator.port == queryChannel.port) { + offSetManagerChannelOpt = Some(queryChannel) + } + else { + val connectString = "%s:%d".format(coordinator.host, coordinator.port) + try { + offSetManagerChannel = new BlockingChannel(coordinator.host, coordinator.port, BlockingChannel.UseDefaultBufferSize, + BlockingChannel.UseDefaultBufferSize, config.socketTimeoutMs) + offSetManagerChannel.connect() + offSetManagerChannelOpt = Some(offSetManagerChannel) + queryChannel.disconnect() + } + catch { + case ioe: IOException => + info("Error while connecting to %s.".format(connectString)) + if (offSetManagerChannel != null) offSetManagerChannel.disconnect() + TimeUnit.MILLISECONDS.sleep(config.offsetsChannelBackoffMs) + offSetManagerChannelOpt = None + } + } + } + + offSetManagerChannelOpt + } + + private def fetchTopicMetadata(config: ConsumerConfig, brokers: Seq[Broker], topics: Seq[String]): TopicMetadataResponse = { + var topicAndPartitions = List[TopicAndPartition]() + var fetchMetadataSucceded = false + var i: Int = 0 + var topicMetadataChannel = getQueryChannel(config, brokers) + var topicMetaDataResponse: TopicMetadataResponse = null + + while (!fetchMetadataSucceded) { + try { + if (!topicMetadataChannel.isConnected) { + topicMetadataChannel = getQueryChannel(config, brokers) + } + info("Fetching metadata from broker %s:%d".format(topicMetadataChannel.host, topicMetadataChannel.port)) + val topicMetaDataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics) + topicMetadataChannel.send(topicMetaDataRequest) + topicMetaDataResponse = TopicMetadataResponse.readFrom(topicMetadataChannel.receive().buffer) + fetchMetadataSucceded = true + } + catch { + case e: Exception => + warn("Fetching metadata for topics [%s] from broker [%s:%d] failed".format(topics, topicMetadataChannel.host, topicMetadataChannel.port)) + topicMetadataChannel.disconnect() + } + } + + topicMetaDataResponse + } + + + def fetch(config: ConsumerConfig, /*brokerConfig: Seq[BrokerConfig]*/ brokers: Seq[Broker], topics: Seq[String]): Option[OffsetFetchResponse] = { + // val brokers = brokerConfig.map(config => new Broker(config.brokerId, config.hostName, config.port)) + var topicAndPartition = collection.Seq[TopicAndPartition]() + + if (topics.size > 0) { + val topicMetaDataResponse: TopicMetadataResponse = fetchTopicMetadata(config, brokers, topics) + topicAndPartition = topicMetaDataResponse.topicsMetadata.flatMap(topicMetaData => { + val topic = topicMetaData.topic + val topicPartitions = topicMetaData.partitionsMetadata.map(partitionMetadata => { + new TopicAndPartition(topic, partitionMetadata.partitionId) + } + ) + topicPartitions + } + ) + } + + val offsetFetchRequest = OffsetFetchRequest(groupId = config.groupId, requestInfo = topicAndPartition, clientId = config.clientId) + var offsetFetchResponseOpt: Option[OffsetFetchResponse] = None + while (!offsetFetchResponseOpt.isDefined) { + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers).get + try { + offsetManagerChannel.send(offsetFetchRequest) + val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetManagerChannel.receive().buffer) + trace("Offset fetch response: %s.".format(offsetFetchResponse)) + + val (leaderChanged, loadInProgress) = + offsetFetchResponse.requestInfo.foldLeft(false, false) { case (folded, (topicPartition, offsetMetadataAndError)) => + (folded._1 || (offsetMetadataAndError.error == ErrorMapping.NotCoordinatorForConsumerCode), + folded._2 || (offsetMetadataAndError.error == ErrorMapping.OffsetsLoadInProgressCode)) + } + + if (leaderChanged) { + offsetManagerChannel.disconnect() + debug("Could not fetch offsets (because offset manager has moved).") + } + else if (loadInProgress) { + debug("Could not fetch offsets (because offset cache is being loaded).") + } + else { + offsetFetchResponseOpt = Some(offsetFetchResponse) + } + } + catch { + case e: Exception => + warn("Error while fetching offsets from %s:%d. Possible cause: %s".format(offsetManagerChannel.host, offsetManagerChannel.port, e.getMessage)) + } + + if (offsetFetchResponseOpt.isEmpty) { + debug("Retrying offset fetch in %d ms".format(config.offsetsChannelBackoffMs)) + TimeUnit.MILLISECONDS.sleep(config.offsetsChannelBackoffMs) + } + } + + offsetFetchResponseOpt + } + + def commit(config: ConsumerConfig, brokers: Seq[Broker], offsetsToCommit: Map[TopicAndPartition, OffsetAndMetadata]): Boolean = { + var done = false + var retriesRemaining = 1 + config.offsetsCommitMaxRetries + var committed = false + + if (offsetsToCommit.size > 0) { + val offsetCommitRequest = OffsetCommitRequest(config.groupId, offsetsToCommit, clientId = config.clientId) + + while (!done) { + val offsetManagerChannel: BlockingChannel = getOffsetManagerChannel(config, brokers).get + try { + offsetManagerChannel.send(offsetCommitRequest) + val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetManagerChannel.receive().buffer) + trace("Offset commit response: %s.".format(offsetCommitResponse)) + + val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { + offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) => + + (folded._1 || //update commitFailed + errorCode != ErrorMapping.NoError, + + folded._2 || //update retryableIfFailed - (only metadata too large is not retryable) + (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode), + + folded._3 || //update shouldRefreshCoordinator + errorCode == ErrorMapping.NotCoordinatorForConsumerCode || + errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode, + + //update error count + folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0)) + } + } + debug(errorCount + " errors in offset commit response.") + + if (shouldRefreshCoordinator) { + debug("Could not commit offsets (because offset coordinator has moved or is unavailable).") + offsetManagerChannel.disconnect() + } + + if (commitFailed && retryableIfFailed) { + committed = false + } + else { + committed = true + } + } + catch { + case t: Throwable => + error("Error while commiting offsets.", t) + offsetManagerChannel.disconnect() + } + + done = { + retriesRemaining -= 1 + retriesRemaining == 0 || committed + } + + if (!done) { + debug("Retrying offset commit in %d ms".format(config.offsetsChannelBackoffMs)) + TimeUnit.MILLISECONDS.sleep(config.offsetsChannelBackoffMs) + } + } + } + + committed + } +} + + diff --git a/core/src/main/scala/kafka/tools/SreOffsetTool.scala b/core/src/main/scala/kafka/tools/SreOffsetTool.scala new file mode 100644 index 0000000..ccc524d --- /dev/null +++ b/core/src/main/scala/kafka/tools/SreOffsetTool.scala @@ -0,0 +1,128 @@ +package kafka.tools + +import java.io.{FileWriter, BufferedWriter, File} +import joptsimple.OptionParser +import kafka.api.{OffsetCommitRequest, OffsetFetchResponse} +import kafka.cluster.Broker +import kafka.common.{OffsetAndMetadata, TopicAndPartition} +import kafka.consumer.ConsumerConfig +import kafka.utils.{Utils, Logging} + +import scala.io.Source + + +/** + * Created by mgharat on 9/10/14. + */ +object SreOffsetTool extends Logging { + + def main(args: Array[String]): Unit = { + info("Starting SreOffsetTool") + + val parser = new OptionParser + + val topicList = parser.accepts("topics", "Topics for which offsets/Partitions should be fetched") + .withRequiredArg() + .ofType(classOf[String]) + + val brokerList = parser.accepts("broker-list", "List of brokers") + .withRequiredArg() + .ofType(classOf[String]) + + val filePathArg = parser.accepts("filepath", "Path to the read/write file") + .withRequiredArg() + .ofType(classOf[String]) + + val readWrite = parser.accepts("readWrite", "Read write flag, 0:Read, 1:Write") + .withRequiredArg() + .ofType(classOf[Integer]) + + val consumerConfigFile = parser.accepts("config-file", "Consumer config file") + .withRequiredArg() + .ofType(classOf[String]) + + + val options = parser.parse(args: _*) + + val hostAndPort = options.valueOf(brokerList).split(",") + + val brokers = hostAndPort.map(hostPort => { + val temp = hostPort.split(":") + val host = temp(0) + val port = temp(1).toInt + new Broker(Math.random().toInt, host, port) + }) + + val topics = options.valueOf(topicList).split(",").toSeq + + val filePath = options.valueOf(filePathArg) + + //Consumer Config properties + val configFile = options.valueOf(consumerConfigFile) + val consumerConfig = new ConsumerConfig(Utils.loadProps(configFile)) + //("/Users/mgharat/kafka/config/consumer.properties")) + + val rwFlag = options.valueOf(readWrite) + + val offSetTool = new OffsetTool(consumerConfig, brokers) + offSetTool.readWrite(rwFlag, filePath, topics) + println("rwFlag : " + rwFlag) + } + + /** + * + * @param config Consumer Configuration + * @param brokers Brokers in the cluster + */ + private class OffsetTool(val config: ConsumerConfig, brokers: Seq[Broker]) { + + /** + * + * @param rwFlag 0 = read, 1 = write + * @param fileName File Path to file which to write to or read from + * @param topics list of topics whose partitions, offsets to fetch + */ + def readWrite(rwFlag: Int, fileName: String, topics: Seq[String] = null) { + rwFlag match { + //Read from OffsetManager and write to a file + case 0 => + val file = new File(fileName) + var offSetFetchResponseOpt: Option[OffsetFetchResponse] = None + + while (!offSetFetchResponseOpt.isDefined) { + offSetFetchResponseOpt = OffsetClient.fetch(config, brokers, topics) + } + val offsetFetchResonse = offSetFetchResponseOpt.get + offsetFetchResonse.requestInfo.foreach(x => println((x._1).topic + " : " + (x._1).partition + ": " + (x._2).offset)) + val topicPartitionOffsetMap = offsetFetchResonse.requestInfo + + //Writing to file + val writer = new BufferedWriter(new FileWriter(file)) + topicPartitionOffsetMap.foreach(tpo => { + val topic = tpo._1.topic + val partition = tpo._1.partition + val offset = tpo._2.offset + writer.write(topic + "/" + partition + "/" + offset) + writer.newLine() + }) + writer.flush() + writer.close() + + //Read from the file and write to the OffsetManager + case 1 => + //Reading from a file line by line. Each line is of the form topic/partition/offset + val fileLines = Source.fromFile(fileName).getLines().toList + val offsetsToCommit = fileLines.map(line => { + val topicPartitionOffset = line.split("/") + val topicAndPartition = new TopicAndPartition(topicPartitionOffset(0), topicPartitionOffset(1).toInt) + val offsetMetadata = new OffsetAndMetadata(topicPartitionOffset(2).toInt) + println(topicAndPartition + "->" + offsetMetadata) + (topicAndPartition -> offsetMetadata) + }).toMap + + println(OffsetClient.commit(config, brokers, offsetsToCommit)) + } + } + } + +} -- 1.9.3 (Apple Git-50)