Index: core/src/main/scala/kafka/tools/UpdateZkOffsetsFromFile.scala =================================================================== --- core/src/main/scala/kafka/tools/UpdateZkOffsetsFromFile.scala (revision 0) +++ core/src/main/scala/kafka/tools/UpdateZkOffsetsFromFile.scala (revision 0) @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io.BufferedReader +import java.io.FileReader + +import kafka.consumer.{SimpleConsumer, ConsumerConfig} +import kafka.utils.{ZkUtils,Utils,ZKStringSerializer,ZKGroupTopicDirs} + +import org.I0Itec.zkclient.ZkClient +import org.apache.log4j.Logger + + +/** + * A utility that updates the offset of broker partitions in ZK. + * + * This utility expects 2 input files as arguments: + * 1. consumer properties file + * 2. a file contains partition offsets data such as: + * (This output data file can be obtained by running kafka.tools.GetZkOffsets) + * + * test01/3-0:285038193 + * test01/1-0:286894308 + * + * To print debug message, add the following line to log4j.properties: + * log4j.logger.kafka.tools.UpdateZkOffsetsFromFile$=DEBUG + * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) + */ +object UpdateZkOffsetsFromFile { + + protected val logger = Logger.getLogger(getClass()) + + def main(args: Array[String]) { + var consumerPropFile : String = null + var groupId : String = null + var partitionOffsetFile : String = null + + if(args.length != 3) { + usage + } + else { + consumerPropFile = args(0) + groupId = args(1) + partitionOffsetFile = args(2) + } + + val config = new ConsumerConfig(Utils.loadProps(consumerPropFile)) + val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile, groupId) + + updateZkOffsets(zkClient, config, partitionOffsets) + } + + private def getPartitionOffsetsFromFile(filename: String, groupId: String):Map[String,String] = { + val fr = new FileReader(filename) + val br = new BufferedReader(fr) + var partOffsetsMap: Map[String,String] = Map() + + var s: String = br.readLine() + while ( s != null && s.length() >= 1) { + val tokens = s.split(":") + val nodePath = "/consumers/" + groupId + "/offsets/" + tokens(0) + + partOffsetsMap += nodePath -> tokens(1) + + if (logger.isDebugEnabled()) + logger.debug("adding node path [" + s + "]") + + s = br.readLine() + } + + return partOffsetsMap + } + + private def updateZkOffsets(zkClient: ZkClient, config: ConsumerConfig, partitionOffsets: Map[String,String]): Unit = { + val cluster = ZkUtils.getCluster(zkClient) + var partitions: List[String] = Nil + + for ((partition, offset) <- partitionOffsets) { + if (logger.isDebugEnabled()) + logger.debug("updating [" + partition + "] with offset [" + offset + "]") + + try { + ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) + } catch { + case e => e.printStackTrace() + } + } + } + + private def usage() = { + println("USAGE: " + this.getClass.getName + " ") + System.exit(1) + } +}