diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 416ecad..2ef75e4 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -16,7 +16,7 @@ if [ $# -lt 1 ]; then - echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]" + echo "USAGE: $0 classname [opts]" exit 1 fi @@ -69,8 +69,6 @@ if [ -z "$KAFKA_LOG4J_OPTS" ]; then KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/config/tools-log4j.properties" fi -KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS" - # Generic jvm settings you want to add if [ -z "$KAFKA_OPTS" ]; then KAFKA_OPTS="" @@ -93,45 +91,16 @@ if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true" fi - -while [ $# -gt 0 ]; do - COMMAND=$1 - case $COMMAND in - -name) - DAEMON_NAME=$2 - CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out - shift 2 - ;; - -loggc) - if [ -z "$KAFKA_GC_LOG_OPTS"] ; then - GC_LOG_ENABLED="true" - fi - shift - ;; - -daemon) - DAEMON_MODE="true" - shift - ;; - *) - break - ;; - esac -done - # GC options GC_FILE_SUFFIX='-gc.log' GC_LOG_FILE_NAME='' -if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then - GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX +if [ "$1" = "daemon" ] && [ -z "$KAFKA_GC_LOG_OPTS"] ; then + shift + GC_LOG_FILE_NAME=$1$GC_FILE_SUFFIX + shift KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps " fi -# Launch mode -if [ "x$DAEMON_MODE" = "xtrue" ]; then - nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null & -else - exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" -fi - +exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" diff --git a/bin/kafka-server-start.sh b/bin/kafka-server-start.sh index 7050649..4a36b2d 100755 --- a/bin/kafka-server-start.sh +++ b/bin/kafka-server-start.sh @@ -16,23 +16,10 @@ if [ $# -lt 1 ]; then - echo "USAGE: $0 [-daemon] server.properties" + echo "USAGE: $0 server.properties" exit 1 fi base_dir=$(dirname $0) export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties" export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" - -EXTRA_ARGS="-name kafkaServer -loggc" - -COMMAND=$1 -case $COMMAND in - -daemon) - EXTRA_ARGS="-daemon "$EXTRA_ARGS - shift - ;; - *) - ;; -esac - -exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka $@ +exec $base_dir/kafka-run-class.sh daemon kafkaServer kafka.Kafka $@ diff --git a/bin/zookeeper-server-start.sh b/bin/zookeeper-server-start.sh index 2e7be74..cdbbf33 100755 --- a/bin/zookeeper-server-start.sh +++ b/bin/zookeeper-server-start.sh @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -if [ $# -lt 1 ]; +if [ $# -ne 1 ]; then echo "USAGE: $0 zookeeper.properties" exit 1 @@ -22,18 +22,5 @@ fi base_dir=$(dirname $0) export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties" export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M" - -EXTRA_ARGS="-name zookeeper -loggc" - -COMMAND=$1 -case $COMMAND in - -daemon) - EXTRA_ARGS="-daemon "$EXTRA_ARGS - shift - ;; - *) - ;; -esac - -exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain $@ +exec $base_dir/kafka-run-class.sh daemon zookeeper org.apache.zookeeper.server.quorum.QuorumPeerMain $@ diff --git a/config/log4j.properties b/config/log4j.properties index 1ab8507..782124d 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -12,9 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -kafka.logs.dir=logs - log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender @@ -23,19 +20,19 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log +log4j.appender.kafkaAppender.File=logs/server.log log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log +log4j.appender.stateChangeAppender.File=logs/state-change.log log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log +log4j.appender.requestAppender.File=logs/kafka-request.log log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n @@ -47,7 +44,7 @@ log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log +log4j.appender.controllerAppender.File=logs/controller.log log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index a167756..8ff4bd5 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -182,7 +182,7 @@ object AdminUtils extends Logging { private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) { try { val zkPath = ZkUtils.getTopicPath(topic) - val jsonPartitionData = ZkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2))) + val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2))) if (!update) { info("Topic creation " + jsonPartitionData.toString) diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 9b3c6ae..26beb96 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -89,8 +89,13 @@ object PreferredReplicaLeaderElectionCommand extends Logging { def writePreferredReplicaElectionData(zkClient: ZkClient, partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) { val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath - val partitionsList = partitionsUndergoingPreferredReplicaElection.map(e => Map("topic" -> e.topic, "partition" -> e.partition)) - val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList)) + var partitionsData: mutable.ListBuffer[String] = ListBuffer[String]() + for (p <- partitionsUndergoingPreferredReplicaElection) { + partitionsData += Utils.mergeJsonFields(Utils.mapToJsonFields(Map("topic" -> p.topic), valueInQuotes = true) ++ + Utils.mapToJsonFields(Map("partition" -> p.partition.toString), valueInQuotes = false)) + } + val jsonPartitionsData = Utils.seqToJson(partitionsData, valueInQuotes = false) + val jsonData = Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonPartitionsData), valueInQuotes = false) try { ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) info("Created preferred replica election path with %s".format(jsonData)) diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 3401afa..981d2bb 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -37,7 +37,11 @@ case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion) override def toString(): String = { - Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" -> isr)) + val jsonDataMap = new collection.mutable.HashMap[String, String] + jsonDataMap.put("leader", leader.toString) + jsonDataMap.put("leaderEpoch", leaderEpoch.toString) + jsonDataMap.put("ISR", isr.mkString(",")) + Utils.mapToJson(jsonDataMap, valueInQuotes = true) } } diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala index fca76a2..08dc3cd 100644 --- a/core/src/main/scala/kafka/api/OffsetResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetResponse.scala @@ -44,11 +44,7 @@ object OffsetResponse { } -case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) { - override def toString(): String = { - new String("error: " + ErrorMapping.exceptionFor(error).getClass.getName + " offsets: " + offsets.mkString) - } -} +case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) case class OffsetResponse(override val correlationId: Int, diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index e332633..a3eb53e 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -25,7 +25,7 @@ import kafka.common.KafkaException private[kafka] trait TopicCount { def getConsumerThreadIdsPerTopic: Map[String, Set[String]] - def getTopicCountMap: Map[String, Int] + def dbString: String def pattern: String protected def makeConsumerThreadIdsPerTopic(consumerIdString: String, @@ -111,7 +111,24 @@ private[kafka] class StaticTopicCount(val consumerIdString: String, } } - def getTopicCountMap = topicCountMap + /** + * return json of + * { "topic1" : 4, + * "topic2" : 4 } + */ + def dbString = { + val builder = new StringBuilder + builder.append("{ ") + var i = 0 + for ( (topic, nConsumers) <- topicCountMap) { + if (i > 0) + builder.append(",") + builder.append("\"" + topic + "\": " + nConsumers) + i += 1 + } + builder.append(" }") + builder.toString() + } def pattern = TopicCount.staticPattern } @@ -125,7 +142,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient, makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*)) } - def getTopicCountMap = Map(topicFilter.regex -> numStreams) + def dbString = "{ \"%s\" : %d }".format(topicFilter.regex, numStreams) def pattern: String = { topicFilter match { diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala index cf3853b..4f20823 100644 --- a/core/src/main/scala/kafka/consumer/TopicFilter.scala +++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala @@ -41,14 +41,10 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging { override def toString = regex - def requiresTopicEventWatcher: Boolean - def isTopicAllowed(topic: String): Boolean } case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) { - override def requiresTopicEventWatcher = !regex.matches("""[\p{Alnum}-|]+""") - override def isTopicAllowed(topic: String) = { val allowed = topic.matches(regex) @@ -62,8 +58,6 @@ case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) { } case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) { - override def requiresTopicEventWatcher = true - override def isTopicAllowed(topic: String) = { val allowed = !topic.matches(regex) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 6d0cfa6..bba304e 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -220,11 +220,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) { info("begin registering consumer " + consumerIdString + " in ZK") val timestamp = SystemTime.milliseconds.toString - val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern, - "timestamp" -> timestamp)) + val consumerRegistrationInfo = + Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false) + ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true)) - createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, - (consumerZKString, consumer) => true, config.zkSessionTimeoutMs) + createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, (consumerZKString, consumer) => true, config.zkSessionTimeoutMs) info("end registering consumer " + consumerIdString + " in ZK") } @@ -754,19 +754,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount) reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams) - if (!topicFilter.requiresTopicEventWatcher) { - info("Not creating event watcher for trivial whitelist " + topicFilter) - } - else { - info("Creating topic event watcher for whitelist " + topicFilter) - wildcardTopicWatcher = new ZookeeperTopicEventWatcher(config, this) - - /* - * Topic events will trigger subsequent synced rebalances. Also, the - * consumer will get registered only after an allowed topic becomes - * available. - */ - } + /* + * Topic events will trigger subsequent synced rebalances. + */ + info("Creating topic event watcher for topics " + topicFilter) + wildcardTopicWatcher = new ZookeeperTopicEventWatcher(zkClient, this) def handleTopicEvent(allTopics: Seq[String]) { debug("Handling topic event") diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index a67c193..38f4ec0 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -22,14 +22,11 @@ import kafka.utils.{ZkUtils, ZKStringSerializer, Logging} import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState -class ZookeeperTopicEventWatcher(val config:ConsumerConfig, +class ZookeeperTopicEventWatcher(val zkClient: ZkClient, val eventHandler: TopicEventHandler[String]) extends Logging { val lock = new Object() - private var zkClient: ZkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, - config.zkConnectionTimeoutMs, ZKStringSerializer) - startWatchingTopicEvents() private def startWatchingTopicEvents() { @@ -53,11 +50,10 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig, info("Shutting down topic event watcher.") if (zkClient != null) { stopWatchingTopicEvents() - zkClient.close() - zkClient = null } - else - warn("Cannot shutdown already shutdown topic event watcher.") + else { + warn("Cannot shutdown since the embedded zookeeper client has already closed.") + } } } @@ -70,7 +66,6 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig, if (zkClient != null) { val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList debug("all topics: %s".format(latestTopics)) - eventHandler.handleTopicEvent(latestTopics) } } @@ -93,10 +88,8 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig, def handleNewSession() { lock.synchronized { if (zkClient != null) { - info( - "ZK expired: resubscribing topic event listener to topic registry") - zkClient.subscribeChildChanges( - ZkUtils.BrokerTopicsPath, topicEventListener) + info("ZK expired: resubscribing topic event listener to topic registry") + zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener) } } } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 4c319ab..88792c2 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -722,7 +722,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) { try { val zkPath = ZkUtils.getTopicPath(topicAndPartition.topic) - val jsonPartitionMap = ZkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2))) + val jsonPartitionMap = ZkUtils.replicaAssignmentZkdata(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2))) ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap) debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap)) } catch { diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index cc6f1eb..33b7360 100644 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -17,7 +17,7 @@ package kafka.server import kafka.utils.ZkUtils._ -import kafka.utils.{Json, Utils, SystemTime, Logging} +import kafka.utils.{Utils, SystemTime, Logging} import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.I0Itec.zkclient.IZkDataListener import kafka.controller.ControllerContext @@ -49,7 +49,9 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: def elect: Boolean = { val timestamp = SystemTime.milliseconds.toString - val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp)) + val electString = + Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false) + ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true)) try { createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId, diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala deleted file mode 100644 index f1f139e..0000000 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ /dev/null @@ -1,387 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import joptsimple.OptionParser -import kafka.cluster.Broker -import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet} -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicReference -import kafka.client.ClientUtils -import java.util.regex.{PatternSyntaxException, Pattern} -import kafka.api._ -import java.text.SimpleDateFormat -import java.util.Date -import kafka.common.{ErrorMapping, TopicAndPartition} -import kafka.utils._ -import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer} - -/** - * For verifying the consistency among replicas. - * - * 1. start a fetcher on every broker. - * 2. each fetcher does the following - * 2.1 issues fetch request - * 2.2 puts the fetched result in a shared buffer - * 2.3 waits for all other fetchers to finish step 2.2 - * 2.4 one of the fetchers verifies the consistency of fetched results among replicas - * - * The consistency verification is up to the high watermark. The tool reports the - * max lag between the verified offset and the high watermark among all partitions. - * - * If a broker goes down, the verification of the partitions on that broker is delayed - * until the broker is up again. - * - * Caveats: - * 1. The tools needs all brokers to be up at startup time. - * 2. The tool doesn't handle out of range offsets. - */ - -object ReplicaVerificationTool extends Logging { - val clientId= "replicaVerificationTool" - val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS" - val dateFormat = new SimpleDateFormat(dateFormatString) - - def getCurrentTimeString() = { - ReplicaVerificationTool.dateFormat.format(new Date(SystemTime.milliseconds)) - } - - def main(args: Array[String]): Unit = { - val parser = new OptionParser - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") - .withRequiredArg - .describedAs("hostname:port,...,hostname:port") - .ofType(classOf[String]) - val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.") - .withRequiredArg - .describedAs("bytes") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(ConsumerConfig.FetchSize) - val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1000) - val topicWhiteListOpt = parser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.") - .withRequiredArg - .describedAs("Java regex (String)") - .ofType(classOf[String]) - .defaultsTo(".*") - val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.") - .withRequiredArg - .describedAs("timestamp/-1(latest)/-2(earliest)") - .ofType(classOf[java.lang.Long]) - .defaultsTo(-1L) - val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Long]) - .defaultsTo(30 * 1000L) - - - val options = parser.parse(args : _*) - CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) - - val regex = options.valueOf(topicWhiteListOpt) - val topicWhiteListFiler = new Whitelist(regex) - - try { - Pattern.compile(regex) - } - catch { - case e: PatternSyntaxException => - throw new RuntimeException(regex + " is an invalid regex.") - } - - val fetchSize = options.valueOf(fetchSizeOpt).intValue - val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue - val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue - val reportInterval = options.valueOf(reportIntervalOpt).longValue - // getting topic metadata - info("Getting topic metatdata...") - val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) - val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) - val brokerMap = topicsMetadataResponse.extractBrokers(topicsMetadataResponse.topicsMetadata) - val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter( - topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic)) true else false - ) - val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap( - topicMetadataResponse => - topicMetadataResponse.partitionsMetadata.flatMap( - partitionMetadata => - partitionMetadata.replicas.map(broker => - TopicPartitionReplica(topic = topicMetadataResponse.topic, partitionId = partitionMetadata.partitionId, replicaId = broker.id)) - ) - ) - debug("Selected topic partitions: " + topicPartitionReplicaList) - val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId) - .map { case (brokerId, partitions) => - brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } } - debug("Topic partitions per broker: " + topicAndPartitionsPerBroker) - val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] = - topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId)) - .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } - debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition) - val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap( - topicMetadataResponse => - topicMetadataResponse.partitionsMetadata.map( - partitionMetadata => - (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)) - ).groupBy(_._2) - .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { - case(topicAndPartition, leaderId) => topicAndPartition }) - debug("Leaders per broker: " + leadersPerBroker) - - val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, - leadersPerBroker, - topicAndPartitionsPerBroker.size, - brokerMap, - initialOffsetTime, - reportInterval) - // create all replica fetcher threads - val verificationBrokerId = topicAndPartitionsPerBroker.head._1 - val fetcherThreads: Iterable[ReplicaFetcher] = topicAndPartitionsPerBroker.map { - case (brokerId, topicAndPartitions) => - new ReplicaFetcher(name = "ReplicaFetcher-" + brokerId, - sourceBroker = brokerMap(brokerId), - topicAndPartitions = topicAndPartitions, - replicaBuffer = replicaBuffer, - socketTimeout = 30000, - socketBufferSize = 256000, - fetchSize = fetchSize, - maxWait = maxWaitMs, - minBytes = 1, - doVerification = (brokerId == verificationBrokerId)) - } - - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - info("Stopping all fetchers") - fetcherThreads.foreach(_.shutdown()) - } - }) - fetcherThreads.foreach(_.start()) - println(ReplicaVerificationTool.getCurrentTimeString() + ": verification process is started.") - - } -} - -private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int) - -private case class ReplicaAndMessageIterator(replicaId: Int, iterator: Iterator[MessageAndOffset]) - -private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long) - -private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int], - leadersPerBroker: Map[Int, Seq[TopicAndPartition]], - expectedNumFetchers: Int, - brokerMap: Map[Int, Broker], - initialOffsetTime: Long, - reportInterval: Long) extends Logging { - private val fetchOffsetMap = new Pool[TopicAndPartition, Long] - private val messageSetCache = new Pool[TopicAndPartition, Pool[Int, FetchResponsePartitionData]] - private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers)) - private val verificationBarrier = new AtomicReference(new CountDownLatch(1)) - @volatile private var lastReportTime = SystemTime.milliseconds - private var maxLag: Long = -1L - private var offsetWithMaxLag: Long = -1L - private var maxLagTopicAndPartition: TopicAndPartition = null - initialize() - - def createNewFetcherBarrier() { - fetcherBarrier.set(new CountDownLatch(expectedNumFetchers)) - } - - def getFetcherBarrier() = fetcherBarrier.get() - - def createNewVerificationBarrier() { - verificationBarrier.set(new CountDownLatch(1)) - } - - def getVerificationBarrier() = verificationBarrier.get() - - private def initialize() { - for (topicAndPartition <- expectedReplicasPerTopicAndPartition.keySet) - messageSetCache.put(topicAndPartition, new Pool[Int, FetchResponsePartitionData]) - setInitialOffsets() - } - - private def offsetResponseStringWithError(offsetResponse: OffsetResponse): String = { - offsetResponse.partitionErrorAndOffsets.filter { - case (topicAndPartition, partitionOffsetsResponse) => partitionOffsetsResponse.error != ErrorMapping.NoError - }.mkString - } - - private def setInitialOffsets() { - for ((brokerId, topicAndPartitions) <- leadersPerBroker) { - val broker = brokerMap(brokerId) - val consumer = new SimpleConsumer(broker.host, broker.port, 10000, 100000, ReplicaVerificationTool.clientId) - val initialOffsetMap: Map[TopicAndPartition, PartitionOffsetRequestInfo] = - topicAndPartitions.map(topicAndPartition => topicAndPartition -> PartitionOffsetRequestInfo(initialOffsetTime, 1)).toMap - val offsetRequest = OffsetRequest(initialOffsetMap) - val offsetResponse = consumer.getOffsetsBefore(offsetRequest) - assert(!offsetResponse.hasError, offsetResponseStringWithError(offsetResponse)) - offsetResponse.partitionErrorAndOffsets.foreach{ - case (topicAndPartition, partitionOffsetResponse) => - fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head) - } - } - } - - def addFetchedData(topicAndPartition: TopicAndPartition, replicaId: Int, partitionData: FetchResponsePartitionData) { - messageSetCache.get(topicAndPartition).put(replicaId, partitionData) - } - - def getOffset(topicAndPartition: TopicAndPartition) = { - fetchOffsetMap.get(topicAndPartition) - } - - def verifyCheckSum() { - debug("Begin verification") - maxLag = -1L - for ((topicAndPartition, fetchResponsePerReplica) <- messageSetCache) { - debug("Verifying " + topicAndPartition) - assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition), - "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected " - + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") - val messageIteratorMap = fetchResponsePerReplica.map { - case(replicaId, fetchResponse) => - replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator} - val maxHw = fetchResponsePerReplica.values.map(_.hw).max - - // Iterate one message at a time from every replica, until high watermark is reached. - var isMessageInAllReplicas = true - while (isMessageInAllReplicas) { - var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None - for ( (replicaId, messageIterator) <- messageIteratorMap) { - if (messageIterator.hasNext) { - val messageAndOffset = messageIterator.next() - - // only verify up to the high watermark - if (messageAndOffset.offset >= fetchResponsePerReplica.get(replicaId).hw) - isMessageInAllReplicas = false - else { - messageInfoFromFirstReplicaOpt match { - case None => - messageInfoFromFirstReplicaOpt = Some( - MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset, messageAndOffset.message.checksum)) - case Some(messageInfoFromFirstReplica) => - if (messageInfoFromFirstReplica.offset != messageAndOffset.offset) { - println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition - + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset " - + messageInfoFromFirstReplica.offset + " doesn't match replica " - + replicaId + "'s offset " + messageAndOffset.offset) - System.exit(1) - } - if (messageInfoFromFirstReplica.checksum != messageAndOffset.message.checksum) - println(ReplicaVerificationTool.getCurrentTimeString + ": partition " - + topicAndPartition + " has unmatched checksum at offset " + messageAndOffset.offset + "; replica " - + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum - + "; replica " + replicaId + "'s checksum " + messageAndOffset.message.checksum) - } - } - } else - isMessageInAllReplicas = false - } - if (isMessageInAllReplicas) { - val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset - fetchOffsetMap.put(topicAndPartition, nextOffset) - debug(expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas match at offset " + - nextOffset + " for " + topicAndPartition) - } - } - if (maxHw - fetchOffsetMap.get(topicAndPartition) > maxLag) { - offsetWithMaxLag = fetchOffsetMap.get(topicAndPartition) - maxLag = maxHw - offsetWithMaxLag - maxLagTopicAndPartition = topicAndPartition - } - fetchResponsePerReplica.clear() - } - val currentTimeMs = SystemTime.milliseconds - if (currentTimeMs - lastReportTime > reportInterval) { - println(ReplicaVerificationTool.dateFormat.format(new Date(currentTimeMs)) + ": max lag is " - + maxLag + " for partition " + maxLagTopicAndPartition + " at offset " + offsetWithMaxLag - + " among " + messageSetCache.size + " paritions") - lastReportTime = currentTimeMs - } - } -} - -private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartitions: Iterable[TopicAndPartition], - replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean) - extends ShutdownableThread(name) { - val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, ReplicaVerificationTool.clientId) - val fetchRequestBuilder = new FetchRequestBuilder(). - clientId(ReplicaVerificationTool.clientId). - replicaId(Request.DebuggingConsumerId). - maxWait(maxWait). - minBytes(minBytes) - - override def doWork() { - - val fetcherBarrier = replicaBuffer.getFetcherBarrier() - val verificationBarrier = replicaBuffer.getVerificationBarrier() - - for (topicAndPartition <- topicAndPartitions) - fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, - replicaBuffer.getOffset(topicAndPartition), fetchSize) - - val fetchRequest = fetchRequestBuilder.build() - debug("Issuing fetch request " + fetchRequest) - - var response: FetchResponse = null - try { - response = simpleConsumer.fetch(fetchRequest) - } catch { - case t: Throwable => - if (!isRunning.get) - throw t - } - - if (response != null) { - response.data.foreach { - case(topicAndPartition, partitionData) => - replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData) - } - } else { - for (topicAndPartition <- topicAndPartitions) - replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, new FetchResponsePartitionData(messages = MessageSet.Empty)) - } - - fetcherBarrier.countDown() - debug("Done fetching") - - // wait for all fetchers to finish - fetcherBarrier.await() - debug("Ready for verification") - - // one of the fetchers will do the verification - if (doVerification) { - debug("Do verification") - replicaBuffer.verifyCheckSum() - replicaBuffer.createNewFetcherBarrier() - replicaBuffer.createNewVerificationBarrier() - debug("Created new barrier") - verificationBarrier.countDown() - } - - verificationBarrier.await() - debug("Done verification") - } -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index a89b046..c9ca95f 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -446,6 +446,65 @@ object Utils extends Logging { def nullOrEmpty(s: String): Boolean = s == null || s.equals("") /** + * Merge JSON fields of the format "key" : value/object/array. + */ + def mergeJsonFields(objects: Seq[String]): String = { + val builder = new StringBuilder + builder.append("{ ") + builder.append(objects.sorted.map(_.trim).mkString(", ")) + builder.append(" }") + builder.toString + } + + /** + * Format a Map[String, String] as JSON object. + */ + def mapToJsonFields(jsonDataMap: Map[String, String], valueInQuotes: Boolean): Seq[String] = { + val jsonFields: mutable.ListBuffer[String] = ListBuffer() + val builder = new StringBuilder + for ((key, value) <- jsonDataMap.toList.sorted) { + builder.append("\"" + key + "\":") + if (valueInQuotes) + builder.append("\"" + value + "\"") + else + builder.append(value) + jsonFields += builder.toString + builder.clear() + } + jsonFields + } + + /** + * Format a Map[String, String] as JSON object. + */ + def mapToJson(jsonDataMap: Map[String, String], valueInQuotes: Boolean): String = { + mergeJsonFields(mapToJsonFields(jsonDataMap, valueInQuotes)) + } + + /** + * Format a Seq[String] as JSON array. + */ + def seqToJson(jsonData: Seq[String], valueInQuotes: Boolean): String = { + val builder = new StringBuilder + builder.append("[ ") + if (valueInQuotes) + builder.append(jsonData.map("\"" + _ + "\"").mkString(", ")) + else + builder.append(jsonData.mkString(", ")) + builder.append(" ]") + builder.toString + } + + /** + * Format a Map[String, Seq[Int]] as JSON + */ + + def mapWithSeqValuesToJson(jsonDataMap: Map[String, Seq[Int]]): String = { + mergeJsonFields(mapToJsonFields(jsonDataMap.map(e => (e._1 -> seqToJson(e._2.map(_.toString), valueInQuotes = false))), + valueInQuotes = false)) + } + + /** * Create a circular (looping) iterator over a collection. * @param coll An iterable over the underlying collection. * @return A circular iterator over the collection. diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 73902b2..856d136 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -32,11 +32,10 @@ import kafka.common.{KafkaException, NoEpochForPartitionException} import kafka.controller.ReassignedPartitionsContext import kafka.controller.PartitionAndReplica import kafka.controller.KafkaController -import scala.{collection, Some} +import scala.Some import kafka.controller.LeaderIsrAndControllerEpoch import kafka.common.TopicAndPartition import kafka.utils.Utils.inLock -import scala.collection object ZkUtils extends Logging { val ConsumersPath = "/consumers" @@ -193,8 +192,11 @@ object ZkUtils extends Logging { def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id - val timestamp = SystemTime.milliseconds.toString - val brokerInfo = Json.encode(Map("version" -> 1, "host" -> host, "port" -> port, "jmx_port" -> jmxPort, "timestamp" -> timestamp)) + val timestamp = "\"" + SystemTime.milliseconds.toString + "\"" + val brokerInfo = + Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++ + Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "timestamp" -> timestamp), + valueInQuotes = false)) val expectedBroker = new Broker(id, host, port) try { @@ -217,17 +219,18 @@ object ZkUtils extends Logging { topicDirs.consumerOwnerDir + "/" + partition } - def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = { - Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch, - "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr)) + val isrInfo = Utils.seqToJson(leaderAndIsr.isr.map(_.toString), valueInQuotes = false) + Utils.mapToJson(Map("version" -> 1.toString, "leader" -> leaderAndIsr.leader.toString, "leader_epoch" -> leaderAndIsr.leaderEpoch.toString, + "controller_epoch" -> controllerEpoch.toString, "isr" -> isrInfo), valueInQuotes = false) } /** * Get JSON partition to replica map from zookeeper. */ - def replicaAssignmentZkData(map: Map[String, Seq[Int]]): String = { - Json.encode(Map("version" -> 1, "partitions" -> map)) + def replicaAssignmentZkdata(map: Map[String, Seq[Int]]): String = { + val jsonReplicaAssignmentMap = Utils.mapWithSeqValuesToJson(map) + Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonReplicaAssignmentMap), valueInQuotes = false) } /** @@ -653,8 +656,16 @@ object ZkUtils extends Logging { } def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = { - Json.encode(Map("version" -> 1, "partitions" -> partitionsToBeReassigned.map(e => Map("topic" -> e._1.topic, "partition" -> e._1.partition, - "replicas" -> e._2)))) + var jsonPartitionsData: mutable.ListBuffer[String] = ListBuffer[String]() + for (p <- partitionsToBeReassigned) { + val jsonReplicasData = Utils.seqToJson(p._2.map(_.toString), valueInQuotes = false) + val jsonTopicData = Utils.mapToJsonFields(Map("topic" -> p._1.topic), valueInQuotes = true) + val jsonPartitionData = Utils.mapToJsonFields(Map("partition" -> p._1.partition.toString, "replicas" -> jsonReplicasData), + valueInQuotes = false) + jsonPartitionsData += Utils.mergeJsonFields(jsonTopicData ++ jsonPartitionData) + } + Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> Utils.seqToJson(jsonPartitionsData.toSeq, valueInQuotes = false)), + valueInQuotes = false) } def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) { diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala index 40a2bf7..cf2724b 100644 --- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala @@ -29,16 +29,13 @@ class TopicFilterTest extends JUnitSuite { def testWhitelists() { val topicFilter1 = new Whitelist("white1,white2") - assertFalse(topicFilter1.requiresTopicEventWatcher) assertTrue(topicFilter1.isTopicAllowed("white2")) assertFalse(topicFilter1.isTopicAllowed("black1")) val topicFilter2 = new Whitelist(".+") - assertTrue(topicFilter2.requiresTopicEventWatcher) assertTrue(topicFilter2.isTopicAllowed("alltopics")) val topicFilter3 = new Whitelist("white_listed-topic.+") - assertTrue(topicFilter3.requiresTopicEventWatcher) assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1")) assertFalse(topicFilter3.isTopicAllowed("black1")) } @@ -46,6 +43,5 @@ class TopicFilterTest extends JUnitSuite { @Test def testBlacklists() { val topicFilter1 = new Blacklist("black1") - assertTrue(topicFilter1.requiresTopicEventWatcher) } } \ No newline at end of file diff --git a/system_test/migration_tool_testsuite/0.7/config/test-log4j.properties b/system_test/migration_tool_testsuite/0.7/config/test-log4j.properties deleted file mode 100644 index a3ae33f..0000000 --- a/system_test/migration_tool_testsuite/0.7/config/test-log4j.properties +++ /dev/null @@ -1,68 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -log4j.rootLogger=INFO, stdout - -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n - -log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender -log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.kafkaAppender.File=logs/server.log -log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout -log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - -log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender -log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.stateChangeAppender.File=logs/state-change.log -log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout -log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - -log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender -log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.requestAppender.File=logs/kafka-request.log -log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout -log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - -log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender -log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.controllerAppender.File=logs/controller.log -log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout -log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - -# Turn on all our debugging info -#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender -#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender -log4j.logger.kafka.perf=DEBUG, kafkaAppender -log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender -#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG -log4j.logger.kafka=INFO, kafkaAppender - -log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender -log4j.additivity.kafka.network.RequestChannel$=false - -#log4j.logger.kafka.network.Processor=TRACE, requestAppender -#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender -#log4j.additivity.kafka.server.KafkaApis=false -log4j.logger.kafka.request.logger=TRACE, requestAppender -log4j.additivity.kafka.request.logger=false - -log4j.logger.kafka.controller=TRACE, controllerAppender -log4j.additivity.kafka.controller=false - -log4j.logger.state.change.logger=TRACE, stateChangeAppender -log4j.additivity.state.change.logger=false - -