diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala index d41fd33..94f829c 100644 --- a/core/src/main/scala/kafka/server/TopicConfigManager.scala +++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala @@ -59,7 +59,7 @@ import org.I0Itec.zkclient.{IZkChildListener, ZkClient} */ class TopicConfigManager(private val zkClient: ZkClient, private val logManager: LogManager, - private val changeExpirationMs: Long = 10*60*1000, + private val changeExpirationMs: Long = 15*60*1000, private val time: Time = SystemTime) extends Logging { private var lastExecutedChange = -1L @@ -86,7 +86,7 @@ class TopicConfigManager(private val zkClient: ZkClient, */ private def processConfigChanges(notifications: Seq[String]) { if (notifications.size > 0) { - info("Processing %d topic config change notification(s)...".format(notifications.size)) + info("Processing config change notification(s)...".format(notifications.size)) val now = time.milliseconds val logs = logManager.logsByTopicPartition.toBuffer val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) @@ -94,26 +94,38 @@ class TopicConfigManager(private val zkClient: ZkClient, val changeId = changeNumber(notification) if (changeId > lastExecutedChange) { val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification - val (topicJson, stat) = ZkUtils.readData(zkClient, changeZnode) - val topic = topicJson.substring(1, topicJson.length - 1) // dequote - if (logsByTopic.contains(topic)) { - /* combine the default properties with the overrides in zk to create the new LogConfig */ - val props = new Properties(logManager.defaultConfig.toProps) - props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) - val logConfig = LogConfig.fromProps(props) - for (log <- logsByTopic(topic)) - log.config = logConfig - lastExecutedChange = changeId - info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props)) - } else { - if (now - stat.getCtime > changeExpirationMs) { - /* this change is now obsolete, try to delete it unless it is the last change left */ - error("Ignoring topic config change %d for topic %s since the change has expired") - } else { - error("Ignoring topic config change %d for topic %s since the topic may have been deleted") + val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.TopicConfigChangesPath + "/" + notification) + if(jsonOpt.isDefined) { + val json = jsonOpt.get + val topic = json.substring(1, json.length - 1) // hacky way to dequote + if (logsByTopic.contains(topic)) { + /* combine the default properties with the overrides in zk to create the new LogConfig */ + val props = new Properties(logManager.defaultConfig.toProps) + props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) + val logConfig = LogConfig.fromProps(props) + for (log <- logsByTopic(topic)) + log.config = logConfig + info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props)) + purgeObsoleteNotifications(now, notifications) } - ZkUtils.deletePath(zkClient, changeZnode) } + lastExecutedChange = changeId + } + } + } + } + + private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) { + val eligible = notifications.sorted.dropRight(1) // never purge newest notification--we need it for the seq number + for(notification <- eligible) { + val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.TopicConfigChangesPath + "/" + notification) + if(jsonOpt.isDefined) { + val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification + if (now - stat.getCtime > changeExpirationMs) { + debug("Purging config change notification " + notification) + ZkUtils.deletePath(zkClient, changeZnode) + } else { + return } } } diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala new file mode 100644 index 0000000..5a1d5cc --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -0,0 +1,35 @@ +package kafka.server + +import junit.framework.Assert._ +import java.util.Properties +import java.io.File +import org.junit.{After, Before, Test} +import kafka.integration.KafkaServerTestHarness +import kafka.utils._ +import kafka.common._ +import kafka.log.LogConfig +import kafka.admin.AdminUtils +import org.scalatest.junit.JUnit3Suite + +class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { + + override val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.choosePort))) + + @Test + def testConfigChange() { + val oldVal = 100000 + val newVal = 200000 + val tp = TopicAndPartition("test", 0) + AdminUtils.createTopic(zkClient, tp.topic, 1, 1, LogConfig(flushInterval = oldVal).toProps) + TestUtils.retry(10000) { + val logOpt = this.servers(0).logManager.getLog(tp) + assertTrue(logOpt.isDefined) + assertEquals(oldVal, logOpt.get.config.flushInterval) + } + AdminUtils.changeTopicConfig(zkClient, tp.topic, LogConfig(flushInterval = newVal).toProps) + TestUtils.retry(10000) { + assertEquals(newVal, this.servers(0).logManager.getLog(tp).get.config.flushInterval) + } + } + +} \ No newline at end of file