From ad45b77834fdca3afc509f3fc30f18f4eef1297e Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Tue, 24 Mar 2015 15:36:39 -0700 Subject: [PATCH] KAFKA:2032 added topic config cache. --- core/src/main/scala/kafka/server/KafkaServer.scala | 7 +- .../main/scala/kafka/server/TopicConfigCache.scala | 83 ++++++++++++++++++++++ .../scala/kafka/server/TopicConfigManager.scala | 2 + .../kafka/server/DynamicConfigChangeTest.scala | 13 ++++ .../unit/kafka/server/TopicConfigCacheTest.scala | 54 ++++++++++++++ 5 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 core/src/main/scala/kafka/server/TopicConfigCache.scala create mode 100644 core/src/test/scala/unit/kafka/server/TopicConfigCacheTest.scala diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index dddef93..4436bf9 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -61,6 +61,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var topicConfigManager: TopicConfigManager = null + var topicConfigCache: TopicConfigCache = null + var consumerCoordinator: ConsumerCoordinator = null var kafkaController: KafkaController = null @@ -152,8 +154,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Mx4jLoader.maybeLoad() + /*initialize topic config cache*/ + topicConfigCache = new TopicConfigCache(config.brokerId, zkClient, defaultConfig = config) + /* start topic config manager */ - topicConfigManager = new TopicConfigManager(zkClient, logManager) + topicConfigManager = new TopicConfigManager(zkClient, logManager, topicConfigCache) topicConfigManager.startup() /* tell everyone we are alive */ diff --git a/core/src/main/scala/kafka/server/TopicConfigCache.scala b/core/src/main/scala/kafka/server/TopicConfigCache.scala new file mode 100644 index 0000000..428a291 --- /dev/null +++ b/core/src/main/scala/kafka/server/TopicConfigCache.scala @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.locks.ReentrantReadWriteLock + +import kafka.admin.AdminUtils +import kafka.log.LogConfig +import kafka.utils.Logging +import kafka.utils.Utils._ +import java.util.{Properties, Map} +import org.I0Itec.zkclient.ZkClient + +import scala.collection.{Set, mutable} + +/** + * A cache for topic configs that is maintained by each broker, this will not just return the overrides but also defaults. + */ +class TopicConfigCache(brokerId: Int, val zkClient: ZkClient, defaultConfig: KafkaConfig) extends Logging { + private val cache: mutable.Map[String, Properties] = new mutable.HashMap[String, Properties]() + private val lock = new ReentrantReadWriteLock() + + this.logIdent = "[Kafka Topic Config Cache on broker %d] ".format(brokerId) + + private def contains(topic: String) : Boolean = { + inReadLock(lock) { + return cache.contains(topic) + } + } + + /** + * Read the topic config from zookeeper and add it to cache. + * @param topic + */ + private def populateTopicConfig(topic: String): Unit = { + inWriteLock(lock) { + val topicConfig: Properties = defaultConfig.toProps + topicConfig.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) + addOrUpdateTopicConfig(topic, topicConfig) + } + } + + /** + * addOrUpdate the topic config cache. + * @param topic + * @param topicConfig + */ + def addOrUpdateTopicConfig(topic: String, topicConfig: Properties) { + inWriteLock(lock) { + cache.put(topic, topicConfig) + } + } + + /** + * returns the topic config, the config has overrides and defaults, if the topic config is not present in the cache + * it will be read from zookeeper and added to the cache. + * @param topic + * @return + */ + def getTopicConfig(topic: String): Properties = { + if(contains(topic)) { + return cache(topic) + } + + populateTopicConfig(topic) + return getTopicConfig(topic) + } +} diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala index 47295d4..529007a 100644 --- a/core/src/main/scala/kafka/server/TopicConfigManager.scala +++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala @@ -60,6 +60,7 @@ import org.I0Itec.zkclient.{IZkChildListener, ZkClient} */ class TopicConfigManager(private val zkClient: ZkClient, private val logManager: LogManager, + private val topicConfigCache: TopicConfigCache, private val changeExpirationMs: Long = 15*60*1000, private val time: Time = SystemTime) extends Logging { private var lastExecutedChange = -1L @@ -103,6 +104,7 @@ class TopicConfigManager(private val zkClient: ZkClient, /* combine the default properties with the overrides in zk to create the new LogConfig */ val props = new Properties(logManager.defaultConfig.toProps) props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) + topicConfigCache.addOrUpdateTopicConfig(topic, props) val logConfig = LogConfig.fromProps(props) for (log <- logsByTopic(topic)) log.config = logConfig diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 93182ae..4043ea4 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -39,10 +39,23 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { val logOpt = this.servers(0).logManager.getLog(tp) assertTrue(logOpt.isDefined) assertEquals(oldVal, logOpt.get.config.flushInterval) + + //check config cache gets populated for a new topic. + val config = this.servers(0).topicConfigCache.getTopicConfig(tp.topic) + assertNotNull(config) + assertFalse(config.isEmpty) + assertEquals(oldVal, LogConfig.fromProps(config).flushInterval) } + AdminUtils.changeTopicConfig(zkClient, tp.topic, LogConfig(flushInterval = newVal).toProps) TestUtils.retry(10000) { assertEquals(newVal, this.servers(0).logManager.getLog(tp).get.config.flushInterval) + + //check config cache was updated with the new values. + val config = this.servers(0).topicConfigCache.getTopicConfig(tp.topic) + assertNotNull(config) + assertFalse(config.isEmpty) + assertEquals(newVal, LogConfig.fromProps(config).flushInterval) } } diff --git a/core/src/test/scala/unit/kafka/server/TopicConfigCacheTest.scala b/core/src/test/scala/unit/kafka/server/TopicConfigCacheTest.scala new file mode 100644 index 0000000..8de3245 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/TopicConfigCacheTest.scala @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.server + +import java.util.Properties + +import kafka.admin.{AdminOperationException, AdminUtils} +import kafka.cluster.Broker +import kafka.common.TopicAndPartition +import kafka.integration.KafkaServerTestHarness +import kafka.log.LogConfig +import kafka.server.{TopicConfigCache, KafkaConfig, KafkaServer} +import kafka.utils.TestUtils +import junit.framework.Assert._ +import kafka.zk.ZooKeeperTestHarness +import org.scalatest.junit.JUnit3Suite + +class TopicConfigCacheTest extends JUnit3Suite with KafkaServerTestHarness { + + override val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.choosePort))) + + def testConfigCache { + var config = this.servers(0).topicConfigCache.getTopicConfig("not-existing-topic") + assertEquals("even for non existing topic we will return default config.",this.servers(0).config.toProps, config) + + //newly created topics should be populated in cache on first request. + val oldVal = 100000 + val tp = TopicAndPartition("test", 0) + AdminUtils.createTopic(zkClient, tp.topic, 1, 1, LogConfig(flushInterval = oldVal).toProps) + config = this.servers(0).topicConfigCache.getTopicConfig(tp.topic) + assertEquals(oldVal, LogConfig.fromProps(config).flushInterval) + + //test that addOrupdate works + val newVal = 20000 + config = LogConfig(flushInterval = newVal).toProps + this.servers(0).topicConfigCache.addOrUpdateTopicConfig(tp.topic, config) + config = this.servers(0).topicConfigCache.getTopicConfig(tp.topic) + assertEquals(newVal, LogConfig.fromProps(config).flushInterval) + } +} -- 1.9.3 (Apple Git-50)