Index: core/src/main/scala/kafka/tools/UpdateZkOffsetsFromFile.scala =================================================================== --- core/src/main/scala/kafka/tools/UpdateZkOffsetsFromFile.scala (revision 0) +++ core/src/main/scala/kafka/tools/UpdateZkOffsetsFromFile.scala (revision 0) @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io.BufferedReader +import java.io.FileReader +import joptsimple._ +import kafka.consumer.{SimpleConsumer, ConsumerConfig} +import kafka.utils.{Logging, ZkUtils,Utils,ZKStringSerializer,ZKGroupTopicDirs} +import org.I0Itec.zkclient.ZkClient + + +/** + * A utility that updates the offset of broker partitions in ZK. + * + * This utility expects 2 input files as arguments: + * 1. consumer properties file + * 2. a file contains partition offsets data such as: + * (This output data file can be obtained by running kafka.tools.GetZkOffsets) + * + * /consumers/group1/offsets/topic1/3-0:285038193 + * /consumers/group1/offsets/topic1/1-0:286894308 + * + * To print debug message, add the following line to log4j.properties: + * log4j.logger.kafka.tools.UpdateZkOffsetsFromFile$=DEBUG + * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) + */ +object UpdateZkOffsetsFromFile extends Logging { + + def main(args: Array[String]) { + val parser = new OptionParser + + val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") + .withRequiredArg() + .defaultsTo("localhost:2181") + .ofType(classOf[String]) + val groupOpt = parser.accepts("group", "Consumer group.") + .withRequiredArg() + .ofType(classOf[String]) + val inFileOpt = parser.accepts("input-file", "Input file") + .withRequiredArg() + .ofType(classOf[String]) + parser.accepts("help", "Print this message.") + + val options = parser.parse(args : _*) + + if (options.has("help")) { + parser.printHelpOn(System.out) + System.exit(0) + } + + for (opt <- List(groupOpt, inFileOpt)) { + if (!options.has(opt)) { + System.err.println("Missing required argument: %s".format(opt)) + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val zkConnect = options.valueOf(zkConnectOpt) + val groupId = options.valueOf(groupOpt) + val partitionOffsetFile = options.valueOf(inFileOpt) + + val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile, groupId) + + updateZkOffsets(zkClient, partitionOffsets) + } + + private def getPartitionOffsetsFromFile(filename: String, groupId: String):Map[String,String] = { + val fr = new FileReader(filename) + val br = new BufferedReader(fr) + var partOffsetsMap: Map[String,String] = Map() + + var s: String = br.readLine() + while ( s != null && s.length() >= 1) { + val tokens = s.split(":") + + // tokens: /consumers/group1/offsets/topic1/0-0:1234 + val grp = tokens(0).split("/")(2) + + if ( grp.equals(groupId) ) { + val nodePath = tokens(0) + partOffsetsMap += nodePath -> tokens(1) + debug("adding node path [" + s + "]") + } + + s = br.readLine() + } + + return partOffsetsMap + } + + private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = { + val cluster = ZkUtils.getCluster(zkClient) + var partitions: List[String] = Nil + + for ((partition, offset) <- partitionOffsets) { + debug("updating [" + partition + "] with offset [" + offset + "]") + + try { + ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) + } catch { + case e => e.printStackTrace() + } + } + } +}