From 5af87e404b12c932cd9cd5f8785fc579a97ed8ea Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Tue, 24 Mar 2015 15:36:39 -0700 Subject: [PATCH 1/2] KAFKA:2035 added topic config cache. --- core/src/main/scala/kafka/server/KafkaServer.scala | 7 +- .../main/scala/kafka/server/TopicConfigCache.scala | 83 ++++++++++++++++++++++ .../scala/kafka/server/TopicConfigManager.scala | 2 + .../kafka/server/DynamicConfigChangeTest.scala | 13 ++++ .../unit/kafka/server/TopicConfigCacheTest.scala | 54 ++++++++++++++ 5 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 core/src/main/scala/kafka/server/TopicConfigCache.scala create mode 100644 core/src/test/scala/unit/kafka/server/TopicConfigCacheTest.scala diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index dddef93..4436bf9 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -61,6 +61,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var topicConfigManager: TopicConfigManager = null + var topicConfigCache: TopicConfigCache = null + var consumerCoordinator: ConsumerCoordinator = null var kafkaController: KafkaController = null @@ -152,8 +154,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Mx4jLoader.maybeLoad() + /*initialize topic config cache*/ + topicConfigCache = new TopicConfigCache(config.brokerId, zkClient, defaultConfig = config) + /* start topic config manager */ - topicConfigManager = new TopicConfigManager(zkClient, logManager) + topicConfigManager = new TopicConfigManager(zkClient, logManager, topicConfigCache) topicConfigManager.startup() /* tell everyone we are alive */ diff --git a/core/src/main/scala/kafka/server/TopicConfigCache.scala b/core/src/main/scala/kafka/server/TopicConfigCache.scala new file mode 100644 index 0000000..428a291 --- /dev/null +++ b/core/src/main/scala/kafka/server/TopicConfigCache.scala @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.locks.ReentrantReadWriteLock + +import kafka.admin.AdminUtils +import kafka.log.LogConfig +import kafka.utils.Logging +import kafka.utils.Utils._ +import java.util.{Properties, Map} +import org.I0Itec.zkclient.ZkClient + +import scala.collection.{Set, mutable} + +/** + * A cache for topic configs that is maintained by each broker, this will not just return the overrides but also defaults. + */ +class TopicConfigCache(brokerId: Int, val zkClient: ZkClient, defaultConfig: KafkaConfig) extends Logging { + private val cache: mutable.Map[String, Properties] = new mutable.HashMap[String, Properties]() + private val lock = new ReentrantReadWriteLock() + + this.logIdent = "[Kafka Topic Config Cache on broker %d] ".format(brokerId) + + private def contains(topic: String) : Boolean = { + inReadLock(lock) { + return cache.contains(topic) + } + } + + /** + * Read the topic config from zookeeper and add it to cache. + * @param topic + */ + private def populateTopicConfig(topic: String): Unit = { + inWriteLock(lock) { + val topicConfig: Properties = defaultConfig.toProps + topicConfig.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) + addOrUpdateTopicConfig(topic, topicConfig) + } + } + + /** + * addOrUpdate the topic config cache. + * @param topic + * @param topicConfig + */ + def addOrUpdateTopicConfig(topic: String, topicConfig: Properties) { + inWriteLock(lock) { + cache.put(topic, topicConfig) + } + } + + /** + * returns the topic config, the config has overrides and defaults, if the topic config is not present in the cache + * it will be read from zookeeper and added to the cache. + * @param topic + * @return + */ + def getTopicConfig(topic: String): Properties = { + if(contains(topic)) { + return cache(topic) + } + + populateTopicConfig(topic) + return getTopicConfig(topic) + } +} diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala index 47295d4..529007a 100644 --- a/core/src/main/scala/kafka/server/TopicConfigManager.scala +++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala @@ -60,6 +60,7 @@ import org.I0Itec.zkclient.{IZkChildListener, ZkClient} */ class TopicConfigManager(private val zkClient: ZkClient, private val logManager: LogManager, + private val topicConfigCache: TopicConfigCache, private val changeExpirationMs: Long = 15*60*1000, private val time: Time = SystemTime) extends Logging { private var lastExecutedChange = -1L @@ -103,6 +104,7 @@ class TopicConfigManager(private val zkClient: ZkClient, /* combine the default properties with the overrides in zk to create the new LogConfig */ val props = new Properties(logManager.defaultConfig.toProps) props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) + topicConfigCache.addOrUpdateTopicConfig(topic, props) val logConfig = LogConfig.fromProps(props) for (log <- logsByTopic(topic)) log.config = logConfig diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 93182ae..4043ea4 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -39,10 +39,23 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { val logOpt = this.servers(0).logManager.getLog(tp) assertTrue(logOpt.isDefined) assertEquals(oldVal, logOpt.get.config.flushInterval) + + //check config cache gets populated for a new topic. + val config = this.servers(0).topicConfigCache.getTopicConfig(tp.topic) + assertNotNull(config) + assertFalse(config.isEmpty) + assertEquals(oldVal, LogConfig.fromProps(config).flushInterval) } + AdminUtils.changeTopicConfig(zkClient, tp.topic, LogConfig(flushInterval = newVal).toProps) TestUtils.retry(10000) { assertEquals(newVal, this.servers(0).logManager.getLog(tp).get.config.flushInterval) + + //check config cache was updated with the new values. + val config = this.servers(0).topicConfigCache.getTopicConfig(tp.topic) + assertNotNull(config) + assertFalse(config.isEmpty) + assertEquals(newVal, LogConfig.fromProps(config).flushInterval) } } diff --git a/core/src/test/scala/unit/kafka/server/TopicConfigCacheTest.scala b/core/src/test/scala/unit/kafka/server/TopicConfigCacheTest.scala new file mode 100644 index 0000000..8de3245 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/TopicConfigCacheTest.scala @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.server + +import java.util.Properties + +import kafka.admin.{AdminOperationException, AdminUtils} +import kafka.cluster.Broker +import kafka.common.TopicAndPartition +import kafka.integration.KafkaServerTestHarness +import kafka.log.LogConfig +import kafka.server.{TopicConfigCache, KafkaConfig, KafkaServer} +import kafka.utils.TestUtils +import junit.framework.Assert._ +import kafka.zk.ZooKeeperTestHarness +import org.scalatest.junit.JUnit3Suite + +class TopicConfigCacheTest extends JUnit3Suite with KafkaServerTestHarness { + + override val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.choosePort))) + + def testConfigCache { + var config = this.servers(0).topicConfigCache.getTopicConfig("not-existing-topic") + assertEquals("even for non existing topic we will return default config.",this.servers(0).config.toProps, config) + + //newly created topics should be populated in cache on first request. + val oldVal = 100000 + val tp = TopicAndPartition("test", 0) + AdminUtils.createTopic(zkClient, tp.topic, 1, 1, LogConfig(flushInterval = oldVal).toProps) + config = this.servers(0).topicConfigCache.getTopicConfig(tp.topic) + assertEquals(oldVal, LogConfig.fromProps(config).flushInterval) + + //test that addOrupdate works + val newVal = 20000 + config = LogConfig(flushInterval = newVal).toProps + this.servers(0).topicConfigCache.addOrUpdateTopicConfig(tp.topic, config) + config = this.servers(0).topicConfigCache.getTopicConfig(tp.topic) + assertEquals(newVal, LogConfig.fromProps(config).flushInterval) + } +} -- 1.9.5 (Apple Git-50.3) From e7d60a9a2878a59062c1c5d5d0b3668d89d49ce2 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Tue, 31 Mar 2015 10:46:30 -0700 Subject: [PATCH 2/2] KAFKA-2035: Added TopicConfigCache. --- .../main/scala/kafka/server/TopicConfigCache.scala | 38 +++++++++++----------- .../kafka/server/DynamicConfigChangeTest.scala | 8 ++--- .../unit/kafka/server/TopicConfigCacheTest.scala | 21 +++++------- 3 files changed, 29 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/kafka/server/TopicConfigCache.scala b/core/src/main/scala/kafka/server/TopicConfigCache.scala index 428a291..bd2113c 100644 --- a/core/src/main/scala/kafka/server/TopicConfigCache.scala +++ b/core/src/main/scala/kafka/server/TopicConfigCache.scala @@ -17,67 +17,67 @@ package kafka.server +import java.util.Properties import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.admin.AdminUtils import kafka.log.LogConfig import kafka.utils.Logging import kafka.utils.Utils._ -import java.util.{Properties, Map} import org.I0Itec.zkclient.ZkClient -import scala.collection.{Set, mutable} +import scala.collection.{mutable} /** * A cache for topic configs that is maintained by each broker, this will not just return the overrides but also defaults. */ class TopicConfigCache(brokerId: Int, val zkClient: ZkClient, defaultConfig: KafkaConfig) extends Logging { - private val cache: mutable.Map[String, Properties] = new mutable.HashMap[String, Properties]() + private val cache: mutable.Map[String, TopicConfig] = new mutable.HashMap[String, TopicConfig]() private val lock = new ReentrantReadWriteLock() this.logIdent = "[Kafka Topic Config Cache on broker %d] ".format(brokerId) - private def contains(topic: String) : Boolean = { - inReadLock(lock) { - return cache.contains(topic) - } - } - /** * Read the topic config from zookeeper and add it to cache. * @param topic */ private def populateTopicConfig(topic: String): Unit = { inWriteLock(lock) { - val topicConfig: Properties = defaultConfig.toProps - topicConfig.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) - addOrUpdateTopicConfig(topic, topicConfig) + val overrideProperties: Properties = AdminUtils.fetchTopicConfig(zkClient, topic) + addOrUpdateTopicConfig(topic, overrideProperties) } } /** * addOrUpdate the topic config cache. * @param topic - * @param topicConfig + * @param overrideProperties */ - def addOrUpdateTopicConfig(topic: String, topicConfig: Properties) { + def addOrUpdateTopicConfig(topic: String, overrideProperties: Properties) { inWriteLock(lock) { + val logConfig: LogConfig = LogConfig.fromProps(defaultConfig.toProps, overrideProperties) + + val topicConfig: TopicConfig = new TopicConfig(logConfig, overrideProperties) cache.put(topic, topicConfig) } } /** - * returns the topic config, the config has overrides and defaults, if the topic config is not present in the cache - * it will be read from zookeeper and added to the cache. + * Returns the topic config. * @param topic * @return */ - def getTopicConfig(topic: String): Properties = { - if(contains(topic)) { - return cache(topic) + def getTopicConfig(topic: String): TopicConfig = { + inReadLock(lock) { + if(cache.contains(topic)) { + return cache(topic) + } } populateTopicConfig(topic) + return getTopicConfig(topic) } } + +class TopicConfig(val logConfig: LogConfig, val overrideProperties: Properties) diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 4043ea4..2d47146 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -42,9 +42,7 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { //check config cache gets populated for a new topic. val config = this.servers(0).topicConfigCache.getTopicConfig(tp.topic) - assertNotNull(config) - assertFalse(config.isEmpty) - assertEquals(oldVal, LogConfig.fromProps(config).flushInterval) + assertEquals(oldVal, config.logConfig.flushInterval) } AdminUtils.changeTopicConfig(zkClient, tp.topic, LogConfig(flushInterval = newVal).toProps) @@ -53,9 +51,7 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { //check config cache was updated with the new values. val config = this.servers(0).topicConfigCache.getTopicConfig(tp.topic) - assertNotNull(config) - assertFalse(config.isEmpty) - assertEquals(newVal, LogConfig.fromProps(config).flushInterval) + assertEquals(newVal, config.logConfig.flushInterval) } } diff --git a/core/src/test/scala/unit/kafka/server/TopicConfigCacheTest.scala b/core/src/test/scala/unit/kafka/server/TopicConfigCacheTest.scala index 8de3245..b1862bc 100644 --- a/core/src/test/scala/unit/kafka/server/TopicConfigCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/TopicConfigCacheTest.scala @@ -16,17 +16,13 @@ */ package unit.kafka.server -import java.util.Properties - -import kafka.admin.{AdminOperationException, AdminUtils} -import kafka.cluster.Broker +import junit.framework.Assert._ +import kafka.admin.AdminUtils import kafka.common.TopicAndPartition import kafka.integration.KafkaServerTestHarness import kafka.log.LogConfig -import kafka.server.{TopicConfigCache, KafkaConfig, KafkaServer} +import kafka.server.{KafkaConfig, TopicConfig} import kafka.utils.TestUtils -import junit.framework.Assert._ -import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite class TopicConfigCacheTest extends JUnit3Suite with KafkaServerTestHarness { @@ -34,21 +30,20 @@ class TopicConfigCacheTest extends JUnit3Suite with KafkaServerTestHarness { override val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.choosePort))) def testConfigCache { - var config = this.servers(0).topicConfigCache.getTopicConfig("not-existing-topic") - assertEquals("even for non existing topic we will return default config.",this.servers(0).config.toProps, config) + var config: TopicConfig = this.servers(0).topicConfigCache.getTopicConfig("not-existing-topic") + assertTrue("for non existing topic override property should be empty. ",config.overrideProperties.isEmpty) //newly created topics should be populated in cache on first request. val oldVal = 100000 val tp = TopicAndPartition("test", 0) AdminUtils.createTopic(zkClient, tp.topic, 1, 1, LogConfig(flushInterval = oldVal).toProps) config = this.servers(0).topicConfigCache.getTopicConfig(tp.topic) - assertEquals(oldVal, LogConfig.fromProps(config).flushInterval) + assertEquals(oldVal, config.logConfig.flushInterval) //test that addOrupdate works val newVal = 20000 - config = LogConfig(flushInterval = newVal).toProps - this.servers(0).topicConfigCache.addOrUpdateTopicConfig(tp.topic, config) + this.servers(0).topicConfigCache.addOrUpdateTopicConfig(tp.topic, LogConfig(flushInterval = newVal).toProps) config = this.servers(0).topicConfigCache.getTopicConfig(tp.topic) - assertEquals(newVal, LogConfig.fromProps(config).flushInterval) + assertEquals(newVal, config.logConfig.flushInterval) } } -- 1.9.5 (Apple Git-50.3)