Index: core/src/main/scala/kafka/tools/GetZkOffsets.scala =================================================================== --- core/src/main/scala/kafka/tools/GetZkOffsets.scala (revision 0) +++ core/src/main/scala/kafka/tools/GetZkOffsets.scala (revision 0) @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io.FileWriter +import java.io.FileNotFoundException +import java.util.ArrayList +import java.util.Iterator +import java.util.List + +import org.apache.zookeeper.WatchedEvent +import org.apache.zookeeper.Watcher +import org.apache.zookeeper.ZooKeeper + +import org.apache.log4j.Logger + + +/** + * A utility that retrieve the offset of broker partitions in ZK and + * prints to an output file in the following format: + * + * test01/1-0:286894308 + * test01/2-0:284803985 + * + * This utility expects 3 arguments: + * 1. Zk host:port string + * 2. group name + * 3. output filename + * + * To print debug message, add the following line to log4j.properties: + * log4j.logger.kafka.tools.GetZkOffsets$=DEBUG + * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) + */ +object GetZkOffsets { + + private val logger = Logger.getLogger(getClass()) + private var zk : ZooKeeper = null + private var hostPort : String = null + private var group : String = null + private var outfile : String = null + + def main(args: Array[String]) { + + if (args.length != 3) { + usage() + System.exit(1) + } + + hostPort = args(0) + group = args(1) + outfile = args(2) + + connectToZK(hostPort) + getPartitionOffsets(group, outfile) + } + + /** + * Get the paths of all Zookeeper nodes + */ + private def getPaths(localRootPath: String, zk: ZooKeeper, allPaths: ArrayList[String]) { + + allPaths.add(localRootPath) + + // get child nodes of the current path => zookeeper command: ls + val nodes: List[String] = zk.getChildren(localRootPath, false) + + if (nodes == null || nodes.size() == 0) + return + + var it: Iterator[String] = nodes.iterator() + + while (it.hasNext()) { + val node = it.next().asInstanceOf[String] + var newLocalRoot = localRootPath + + if (newLocalRoot.endsWith("/")) + newLocalRoot = localRootPath + node + else + newLocalRoot = localRootPath + "/" + node + + // recursively retrieve the paths + getPaths(newLocalRoot, zk, allPaths) + } + } + + private def getPartitionOffsets(group: String, outfile: String) { + + try { + val fileWriter : FileWriter = new FileWriter(outfile) + var allPaths : ArrayList[String] = new ArrayList[String] + + getPaths("/", zk, allPaths) + + if (logger.isDebugEnabled()) { + for (i <- 0 until allPaths.size() - 1) + logger.debug("Zk node path: " + allPaths.get(i)) + } + + for (j <- 0 until allPaths.size() - 1) { + val path = allPaths.get(j) + val nodePathPrefix = "/consumers/"+group+"/offsets/" + val nodePathTokens = path.split("/") + var lastToken: String = "null" + + if ( nodePathTokens.length > 0 ) { + lastToken = nodePathTokens(nodePathTokens.length - 1) + } + + // there should be a '-' hypen between broker_id and partition_id + // in the last part of the node path + if (path.contains(nodePathPrefix) && lastToken.contains("-") ) { + var offset: Array[Byte] = zk.getData(path, false, null) + + if (offset != null) { + // we only need topic + partition id out of the node path + val topicPartId = path.substring(nodePathPrefix.length()) + val offsetStr = new String(offset) + + if (logger.isDebugEnabled()) + logger.debug("Topic-partition-offset: " + topicPartId + ":" + offsetStr) + + fileWriter.write(topicPartId + ":" + offsetStr + "\n") + } + else { + logger.debug("Null offset value returned on node path [" + path + "]") + } + } + } + + fileWriter.flush() + fileWriter.close() + } + catch { + case ex => ex.printStackTrace() + } + } + + private class MyWatcher extends Watcher { + def process(event: WatchedEvent) { + } + } + + private def connectToZK(hostport: String) { + if (zk != null && zk.getState().isAlive()) { + zk.close(); + } + + zk = new ZooKeeper(hostport, 10000, new MyWatcher()); + } + + private def usage() = { + println("USAGE: " + this.getClass.getName + " ") + System.exit(1) + } +} Index: bin/zookeeper-offset-dump-to-file.sh =================================================================== --- bin/zookeeper-offset-dump-to-file.sh (revision 0) +++ bin/zookeeper-offset-dump-to-file.sh (revision 0) @@ -0,0 +1,53 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# ======================================================== +# zookeeper-offset-dump-to-file.sh +# +# Run this script to automate the copying of Zookeeper +# offsets to files in specified intervals. +# +# The output files would have the following data format: +# test01/1-0:16496050 +# test01/2-0:16885141 +# +# These data files could be served for the purpose of +# Zookeeper offsets recovery by using another tool: +# kafka.tools.UpdateZkOffsetsFromFile. +# ======================================================== + +if [ $# != 4 ]; then + echo "usage: $0 " + exit +fi + +host_port=$1 +group_id=$2 +output_dir=$3 +sleep_time=$4 + +host_name=`echo $host_port | cut -f1 -d ':'` +port_no=`echo $host_port | cut -f2 -d ':'` + +while [ 'x' == 'x' ] +do + datestamp=`date "+%y-%m-%d-%H-%M-%S"` + file_name="zk-offsets-${host_name}-${port_no}-${group_id}-${datestamp}.log" + $(dirname $0)/kafka-run-class.sh kafka.tools.GetZkOffsets $host_port $group_id $output_dir/${file_name} + sleep $sleep_time +done Property changes on: bin/zookeeper-offset-dump-to-file.sh ___________________________________________________________________ Added: svn:executable + *