From 2e88056b92598482cddee4506803ed46b43f854c Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Fri, 19 Jun 2015 17:36:23 -0700 Subject: [PATCH] minor corrections to LogConfig and KafkaConfigTest --- .../apache/kafka/common/config/AbstractConfig.java | 25 +- core/src/main/scala/kafka/log/LogConfig.scala | 2 +- core/src/main/scala/kafka/server/KafkaServer.scala | 4 + .../test/scala/unit/kafka/log/LogConfigTest.scala | 22 -- .../kafka/server/KafkaConfigConfigDefTest.scala | 403 --------------------- .../scala/unit/kafka/server/KafkaConfigTest.scala | 276 +++++++++++++- 6 files changed, 299 insertions(+), 433 deletions(-) delete mode 100644 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index bae528d..e73f87b 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -33,21 +33,26 @@ public class AbstractConfig { private final Set used; /* the original values passed in by the user */ - private final Map originals; + private final Map originals; /* the parsed values */ private final Map values; @SuppressWarnings("unchecked") - public AbstractConfig(ConfigDef definition, Map originals) { + public AbstractConfig(ConfigDef definition, Map originals, Boolean doLog) { /* check that all the keys are really strings */ for (Object key : originals.keySet()) if (!(key instanceof String)) throw new ConfigException(key.toString(), originals.get(key), "Key must be a string."); - this.originals = (Map) originals; + this.originals = (Map) originals; this.values = definition.parse(this.originals); this.used = Collections.synchronizedSet(new HashSet()); - logAll(); + if (doLog) + logAll(); + } + + public AbstractConfig(ConfigDef definition, Map originals) { + this(definition, originals, true); } protected Object get(String key) { @@ -102,12 +107,20 @@ public class AbstractConfig { return copy; } - private void logAll() { + public void logAll() { + logConfig(this.values); + } + + public void logUserDefined() { + logConfig(this.originals); + } + + private void logConfig(Map config) { StringBuilder b = new StringBuilder(); b.append(getClass().getSimpleName()); b.append(" values: "); b.append(Utils.NL); - for (Map.Entry entry : this.values.entrySet()) { + for (Map.Entry entry : config.entrySet()) { b.append('\t'); b.append(entry.getKey()); b.append(" = "); diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index fc41132..c969d16 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -46,7 +46,7 @@ object Defaults { val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable } -case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props) { +case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) { val segmentSize = getInt(LogConfig.SegmentBytesProp) val segmentMs = getLong(LogConfig.SegmentMsProp) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 52dc728..18f0be5 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -395,6 +395,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val defaultLogConfig = LogConfig(defaultProps) val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) + configs.map{case (topic, conf) => { + info("Read configuration for topic " + topic) + conf.logUserDefined() + }} // read the log configurations from zookeeper val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, dedupeBufferSize = config.logCleanerDedupeBufferSize, diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 19dcb47..72e98b3 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -33,28 +33,6 @@ class LogConfigTest extends JUnit3Suite { } @Test - def testFromPropsToProps() { - import scala.util.Random._ - val expected = new Properties() - LogConfig.configNames().foreach((name) => { - name match { - case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false")) - case LogConfig.CompressionTypeProp => expected.setProperty(name, randFrom("producer", "uncompressed", "gzip")) - case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact, LogConfig.Delete)) - case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1)) - case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString) - case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString) - case LogConfig.RetentionMsProp => expected.setProperty(name, nextLong().toString) - case LogConfig.PreAllocateEnableProp => expected.setProperty(name, randFrom("true", "false")) - case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString) - } - }) - - val actual = LogConfig(expected).originals - Assert.assertEquals(expected, actual) - } - - @Test def testFromPropsInvalid() { LogConfig.configNames().foreach((name) => { name match { diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala deleted file mode 100644 index 98a5b04..0000000 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ /dev/null @@ -1,403 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.server - -import java.util.Properties - -import kafka.api.ApiVersion -import kafka.message._ -import org.apache.kafka.common.protocol.SecurityProtocol -import org.junit.{Assert, Test} -import org.scalatest.junit.JUnit3Suite - -import scala.collection.Map -import scala.util.Random._ - -class KafkaConfigConfigDefTest extends JUnit3Suite { - - @Test - def testFromPropsEmpty() { - // only required - val p = new Properties() - p.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") - val actualConfig = KafkaConfig.fromProps(p) - - val expectedConfig = new KafkaConfig(p) - - Assert.assertEquals(expectedConfig.zkConnect, actualConfig.zkConnect) - Assert.assertEquals(expectedConfig.zkSessionTimeoutMs, actualConfig.zkSessionTimeoutMs) - Assert.assertEquals(expectedConfig.zkConnectionTimeoutMs, actualConfig.zkConnectionTimeoutMs) - Assert.assertEquals(expectedConfig.zkSyncTimeMs, actualConfig.zkSyncTimeMs) - Assert.assertEquals(expectedConfig.maxReservedBrokerId, actualConfig.maxReservedBrokerId) - Assert.assertEquals(expectedConfig.brokerId, actualConfig.brokerId) - Assert.assertEquals(expectedConfig.messageMaxBytes, actualConfig.messageMaxBytes) - Assert.assertEquals(expectedConfig.numNetworkThreads, actualConfig.numNetworkThreads) - Assert.assertEquals(expectedConfig.numIoThreads, actualConfig.numIoThreads) - Assert.assertEquals(expectedConfig.backgroundThreads, actualConfig.backgroundThreads) - Assert.assertEquals(expectedConfig.queuedMaxRequests, actualConfig.queuedMaxRequests) - - Assert.assertEquals(expectedConfig.port, actualConfig.port) - Assert.assertEquals(expectedConfig.hostName, actualConfig.hostName) - Assert.assertEquals(expectedConfig.advertisedHostName, actualConfig.advertisedHostName) - Assert.assertEquals(expectedConfig.advertisedPort, actualConfig.advertisedPort) - Assert.assertEquals(expectedConfig.socketSendBufferBytes, actualConfig.socketSendBufferBytes) - Assert.assertEquals(expectedConfig.socketReceiveBufferBytes, actualConfig.socketReceiveBufferBytes) - Assert.assertEquals(expectedConfig.socketRequestMaxBytes, actualConfig.socketRequestMaxBytes) - Assert.assertEquals(expectedConfig.maxConnectionsPerIp, actualConfig.maxConnectionsPerIp) - Assert.assertEquals(expectedConfig.maxConnectionsPerIpOverrides, actualConfig.maxConnectionsPerIpOverrides) - Assert.assertEquals(expectedConfig.connectionsMaxIdleMs, actualConfig.connectionsMaxIdleMs) - - Assert.assertEquals(expectedConfig.numPartitions, actualConfig.numPartitions) - Assert.assertEquals(expectedConfig.logDirs, actualConfig.logDirs) - - Assert.assertEquals(expectedConfig.logSegmentBytes, actualConfig.logSegmentBytes) - - Assert.assertEquals(expectedConfig.logRollTimeMillis, actualConfig.logRollTimeMillis) - Assert.assertEquals(expectedConfig.logRollTimeJitterMillis, actualConfig.logRollTimeJitterMillis) - Assert.assertEquals(expectedConfig.logRetentionTimeMillis, actualConfig.logRetentionTimeMillis) - - Assert.assertEquals(expectedConfig.logRetentionBytes, actualConfig.logRetentionBytes) - Assert.assertEquals(expectedConfig.logCleanupIntervalMs, actualConfig.logCleanupIntervalMs) - Assert.assertEquals(expectedConfig.logCleanupPolicy, actualConfig.logCleanupPolicy) - Assert.assertEquals(expectedConfig.logCleanerThreads, actualConfig.logCleanerThreads) - Assert.assertEquals(expectedConfig.logCleanerIoMaxBytesPerSecond, actualConfig.logCleanerIoMaxBytesPerSecond, 0.0) - Assert.assertEquals(expectedConfig.logCleanerDedupeBufferSize, actualConfig.logCleanerDedupeBufferSize) - Assert.assertEquals(expectedConfig.logCleanerIoBufferSize, actualConfig.logCleanerIoBufferSize) - Assert.assertEquals(expectedConfig.logCleanerDedupeBufferLoadFactor, actualConfig.logCleanerDedupeBufferLoadFactor, 0.0) - Assert.assertEquals(expectedConfig.logCleanerBackoffMs, actualConfig.logCleanerBackoffMs) - Assert.assertEquals(expectedConfig.logCleanerMinCleanRatio, actualConfig.logCleanerMinCleanRatio, 0.0) - Assert.assertEquals(expectedConfig.logCleanerEnable, actualConfig.logCleanerEnable) - Assert.assertEquals(expectedConfig.logCleanerDeleteRetentionMs, actualConfig.logCleanerDeleteRetentionMs) - Assert.assertEquals(expectedConfig.logIndexSizeMaxBytes, actualConfig.logIndexSizeMaxBytes) - Assert.assertEquals(expectedConfig.logIndexIntervalBytes, actualConfig.logIndexIntervalBytes) - Assert.assertEquals(expectedConfig.logFlushIntervalMessages, actualConfig.logFlushIntervalMessages) - Assert.assertEquals(expectedConfig.logDeleteDelayMs, actualConfig.logDeleteDelayMs) - Assert.assertEquals(expectedConfig.logFlushSchedulerIntervalMs, actualConfig.logFlushSchedulerIntervalMs) - Assert.assertEquals(expectedConfig.logFlushIntervalMs, actualConfig.logFlushIntervalMs) - Assert.assertEquals(expectedConfig.logFlushOffsetCheckpointIntervalMs, actualConfig.logFlushOffsetCheckpointIntervalMs) - Assert.assertEquals(expectedConfig.numRecoveryThreadsPerDataDir, actualConfig.numRecoveryThreadsPerDataDir) - Assert.assertEquals(expectedConfig.autoCreateTopicsEnable, actualConfig.autoCreateTopicsEnable) - - Assert.assertEquals(expectedConfig.minInSyncReplicas, actualConfig.minInSyncReplicas) - - Assert.assertEquals(expectedConfig.controllerSocketTimeoutMs, actualConfig.controllerSocketTimeoutMs) - Assert.assertEquals(expectedConfig.defaultReplicationFactor, actualConfig.defaultReplicationFactor) - Assert.assertEquals(expectedConfig.replicaLagTimeMaxMs, actualConfig.replicaLagTimeMaxMs) - Assert.assertEquals(expectedConfig.replicaSocketTimeoutMs, actualConfig.replicaSocketTimeoutMs) - Assert.assertEquals(expectedConfig.replicaSocketReceiveBufferBytes, actualConfig.replicaSocketReceiveBufferBytes) - Assert.assertEquals(expectedConfig.replicaFetchMaxBytes, actualConfig.replicaFetchMaxBytes) - Assert.assertEquals(expectedConfig.replicaFetchWaitMaxMs, actualConfig.replicaFetchWaitMaxMs) - Assert.assertEquals(expectedConfig.replicaFetchMinBytes, actualConfig.replicaFetchMinBytes) - Assert.assertEquals(expectedConfig.replicaFetchBackoffMs, actualConfig.replicaFetchBackoffMs) - Assert.assertEquals(expectedConfig.numReplicaFetchers, actualConfig.numReplicaFetchers) - Assert.assertEquals(expectedConfig.replicaHighWatermarkCheckpointIntervalMs, actualConfig.replicaHighWatermarkCheckpointIntervalMs) - Assert.assertEquals(expectedConfig.fetchPurgatoryPurgeIntervalRequests, actualConfig.fetchPurgatoryPurgeIntervalRequests) - Assert.assertEquals(expectedConfig.producerPurgatoryPurgeIntervalRequests, actualConfig.producerPurgatoryPurgeIntervalRequests) - Assert.assertEquals(expectedConfig.autoLeaderRebalanceEnable, actualConfig.autoLeaderRebalanceEnable) - Assert.assertEquals(expectedConfig.leaderImbalancePerBrokerPercentage, actualConfig.leaderImbalancePerBrokerPercentage) - Assert.assertEquals(expectedConfig.leaderImbalanceCheckIntervalSeconds, actualConfig.leaderImbalanceCheckIntervalSeconds) - Assert.assertEquals(expectedConfig.uncleanLeaderElectionEnable, actualConfig.uncleanLeaderElectionEnable) - - Assert.assertEquals(expectedConfig.controlledShutdownMaxRetries, actualConfig.controlledShutdownMaxRetries) - Assert.assertEquals(expectedConfig.controlledShutdownRetryBackoffMs, actualConfig.controlledShutdownRetryBackoffMs) - Assert.assertEquals(expectedConfig.controlledShutdownEnable, actualConfig.controlledShutdownEnable) - - Assert.assertEquals(expectedConfig.consumerMinSessionTimeoutMs, actualConfig.consumerMinSessionTimeoutMs) - Assert.assertEquals(expectedConfig.consumerMaxSessionTimeoutMs, actualConfig.consumerMaxSessionTimeoutMs) - - Assert.assertEquals(expectedConfig.offsetMetadataMaxSize, actualConfig.offsetMetadataMaxSize) - Assert.assertEquals(expectedConfig.offsetsLoadBufferSize, actualConfig.offsetsLoadBufferSize) - Assert.assertEquals(expectedConfig.offsetsTopicReplicationFactor, actualConfig.offsetsTopicReplicationFactor) - Assert.assertEquals(expectedConfig.offsetsTopicPartitions, actualConfig.offsetsTopicPartitions) - Assert.assertEquals(expectedConfig.offsetsTopicSegmentBytes, actualConfig.offsetsTopicSegmentBytes) - Assert.assertEquals(expectedConfig.offsetsTopicCompressionCodec, actualConfig.offsetsTopicCompressionCodec) - Assert.assertEquals(expectedConfig.offsetsRetentionMinutes, actualConfig.offsetsRetentionMinutes) - Assert.assertEquals(expectedConfig.offsetsRetentionCheckIntervalMs, actualConfig.offsetsRetentionCheckIntervalMs) - Assert.assertEquals(expectedConfig.offsetCommitTimeoutMs, actualConfig.offsetCommitTimeoutMs) - Assert.assertEquals(expectedConfig.offsetCommitRequiredAcks, actualConfig.offsetCommitRequiredAcks) - - Assert.assertEquals(expectedConfig.deleteTopicEnable, actualConfig.deleteTopicEnable) - Assert.assertEquals(expectedConfig.compressionType, actualConfig.compressionType) - } - - private def atLeastXIntProp(x: Int): String = (nextInt(Int.MaxValue - 1) + x).toString - - private def atLeastOneIntProp: String = atLeastXIntProp(1) - - private def inRangeIntProp(fromInc: Int, toInc: Int): String = (nextInt(toInc + 1 - fromInc) + fromInc).toString - - @Test - def testFromPropsToProps() { - import scala.util.Random._ - val expected = new Properties() - KafkaConfig.configNames().foreach(name => { - name match { - case KafkaConfig.ZkConnectProp => expected.setProperty(name, "127.0.0.1:2181") - case KafkaConfig.ZkSessionTimeoutMsProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.ZkConnectionTimeoutMsProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.ZkSyncTimeMsProp => expected.setProperty(name, atLeastOneIntProp) - - case KafkaConfig.NumNetworkThreadsProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.NumIoThreadsProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.BackgroundThreadsProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.QueuedMaxRequestsProp => expected.setProperty(name, atLeastOneIntProp) - - case KafkaConfig.PortProp => expected.setProperty(name, "1234") - case KafkaConfig.HostNameProp => expected.setProperty(name, nextString(10)) - case KafkaConfig.ListenersProp => expected.setProperty(name, "PLAINTEXT://:9092") - case KafkaConfig.AdvertisedHostNameProp => expected.setProperty(name, nextString(10)) - case KafkaConfig.AdvertisedPortProp => expected.setProperty(name, "4321") - case KafkaConfig.AdvertisedListenersProp => expected.setProperty(name, "PLAINTEXT://:2909") - case KafkaConfig.SocketRequestMaxBytesProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.MaxConnectionsPerIpProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.MaxConnectionsPerIpOverridesProp => expected.setProperty(name, "127.0.0.1:2, 127.0.0.2:3") - - case KafkaConfig.NumPartitionsProp => expected.setProperty(name, "2") - case KafkaConfig.LogDirsProp => expected.setProperty(name, "/tmp/logs,/tmp/logs2") - case KafkaConfig.LogDirProp => expected.setProperty(name, "/tmp/log") - case KafkaConfig.LogSegmentBytesProp => expected.setProperty(name, atLeastXIntProp(Message.MinHeaderSize)) - - case KafkaConfig.LogRollTimeMillisProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.LogRollTimeHoursProp => expected.setProperty(name, atLeastOneIntProp) - - case KafkaConfig.LogRetentionTimeMillisProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.LogRetentionTimeMinutesProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.LogRetentionTimeHoursProp => expected.setProperty(name, atLeastOneIntProp) - - case KafkaConfig.LogCleanupIntervalMsProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.LogCleanupPolicyProp => expected.setProperty(name, randFrom(Defaults.Compact, Defaults.Delete)) - case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1)) - case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1)) - case KafkaConfig.LogCleanerMinCleanRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1)) - case KafkaConfig.LogCleanerEnableProp => expected.setProperty(name, randFrom("true", "false")) - case KafkaConfig.LogIndexSizeMaxBytesProp => expected.setProperty(name, atLeastXIntProp(4)) - case KafkaConfig.LogFlushIntervalMessagesProp => expected.setProperty(name, atLeastOneIntProp) - - case KafkaConfig.NumRecoveryThreadsPerDataDirProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.AutoCreateTopicsEnableProp => expected.setProperty(name, randFrom("true", "false")) - case KafkaConfig.MinInSyncReplicasProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.AutoLeaderRebalanceEnableProp => expected.setProperty(name, randFrom("true", "false")) - case KafkaConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false")) - case KafkaConfig.LogPreAllocateProp => expected.setProperty(name, randFrom("true", "false")) - case KafkaConfig.InterBrokerSecurityProtocolProp => expected.setProperty(name, SecurityProtocol.PLAINTEXT.toString) - case KafkaConfig.InterBrokerProtocolVersionProp => expected.setProperty(name, ApiVersion.latestVersion.toString) - - case KafkaConfig.ControlledShutdownEnableProp => expected.setProperty(name, randFrom("true", "false")) - case KafkaConfig.OffsetsLoadBufferSizeProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.OffsetsTopicPartitionsProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.OffsetsTopicSegmentBytesProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.OffsetsTopicCompressionCodecProp => expected.setProperty(name, randFrom(GZIPCompressionCodec.codec.toString, - SnappyCompressionCodec.codec.toString, LZ4CompressionCodec.codec.toString, NoCompressionCodec.codec.toString)) - case KafkaConfig.OffsetsRetentionMinutesProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.OffsetCommitTimeoutMsProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.DeleteTopicEnableProp => expected.setProperty(name, randFrom("true", "false")) - - // explicit, non trivial validations or with transient dependencies - - // require(brokerId >= -1 && brokerId <= maxReservedBrokerId) - case KafkaConfig.MaxReservedBrokerIdProp => expected.setProperty(name, "100") - case KafkaConfig.BrokerIdProp => expected.setProperty(name, inRangeIntProp(0, 100)) - // require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024) - case KafkaConfig.LogCleanerThreadsProp => expected.setProperty(name, "2") - case KafkaConfig.LogCleanerDedupeBufferSizeProp => expected.setProperty(name, (1024 * 1024 * 3 + 1).toString) - // require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs) - case KafkaConfig.ReplicaFetchWaitMaxMsProp => expected.setProperty(name, "321") - case KafkaConfig.ReplicaSocketTimeoutMsProp => expected.setProperty(name, atLeastXIntProp(321)) - // require(replicaFetchMaxBytes >= messageMaxBytes) - case KafkaConfig.MessageMaxBytesProp => expected.setProperty(name, "1234") - case KafkaConfig.ReplicaFetchMaxBytesProp => expected.setProperty(name, atLeastXIntProp(1234)) - // require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs) - case KafkaConfig.ReplicaLagTimeMaxMsProp => expected.setProperty(name, atLeastXIntProp(321)) - //require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor) - case KafkaConfig.OffsetCommitRequiredAcksProp => expected.setProperty(name, "-1") - case KafkaConfig.OffsetsTopicReplicationFactorProp => expected.setProperty(name, inRangeIntProp(-1, Short.MaxValue)) - //BrokerCompressionCodec.isValid(compressionType) - case KafkaConfig.CompressionTypeProp => expected.setProperty(name, randFrom(BrokerCompressionCodec.brokerCompressionOptions)) - - case KafkaConfig.MetricNumSamplesProp => expected.setProperty(name, "2") - case KafkaConfig.MetricSampleWindowMsProp => expected.setProperty(name, "1000") - case KafkaConfig.MetricReporterClassesProp => expected.setProperty(name, "") - - case nonNegativeIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString) - } - }) - - val actual = KafkaConfig.fromProps(expected).originals - Assert.assertEquals(expected, actual) - } - - @Test - def testFromPropsInvalid() { - def getBaseProperties(): Properties = { - val validRequiredProperties = new Properties() - validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1") - validRequiredProperties - } - // to ensure a basis is valid - bootstraps all needed validation - KafkaConfig.fromProps(getBaseProperties()) - - KafkaConfig.configNames().foreach(name => { - name match { - case KafkaConfig.ZkConnectProp => // ignore string - case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - - case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - - case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.HostNameProp => // ignore string - case KafkaConfig.AdvertisedHostNameProp => //ignore string - case KafkaConfig.AdvertisedPortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.SocketSendBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.SocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.MaxConnectionsPerIpOverridesProp => - assertPropertyInvalid(getBaseProperties(), name, "127.0.0.1:not_a_number") - case KafkaConfig.ConnectionsMaxIdleMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - - case KafkaConfig.NumPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.LogDirsProp => // ignore string - case KafkaConfig.LogDirProp => // ignore string - case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", Message.MinHeaderSize - 1) - - case KafkaConfig.LogRollTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.LogRollTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - - case KafkaConfig.LogRetentionTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.LogRetentionTimeMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.LogRetentionTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - - case KafkaConfig.LogRetentionBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogCleanupIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.LogCleanupPolicyProp => assertPropertyInvalid(getBaseProperties(), name, "unknown_policy", "0") - case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogCleanerDedupeBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "1024") - case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean") - case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "3") - case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") - case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.ControllerSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2") - case KafkaConfig.ReplicaSocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ReplicaFetchMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.AutoLeaderRebalanceEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") - case KafkaConfig.LeaderImbalancePerBrokerPercentageProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.UncleanLeaderElectionEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") - case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") - case KafkaConfig.ConsumerMinSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ConsumerMaxSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.OffsetsTopicPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.OffsetsTopicSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.OffsetsTopicCompressionCodecProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1") - case KafkaConfig.OffsetsRetentionMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2") - - case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") - - case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0") - case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0") - case KafkaConfig.MetricReporterClassesProp => // ignore string - - case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1") - } - }) - } - - @Test - def testSpecificProperties(): Unit = { - val defaults = new Properties() - defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") - // For ZkConnectionTimeoutMs - defaults.put(KafkaConfig.ZkSessionTimeoutMsProp, "1234") - defaults.put(KafkaConfig.MaxReservedBrokerIdProp, "1") - defaults.put(KafkaConfig.BrokerIdProp, "1") - defaults.put(KafkaConfig.HostNameProp, "127.0.0.1") - defaults.put(KafkaConfig.PortProp, "1122") - defaults.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3") - defaults.put(KafkaConfig.LogDirProp, "/tmp1,/tmp2") - defaults.put(KafkaConfig.LogRollTimeHoursProp, "12") - defaults.put(KafkaConfig.LogRollTimeJitterHoursProp, "11") - defaults.put(KafkaConfig.LogRetentionTimeHoursProp, "10") - //For LogFlushIntervalMsProp - defaults.put(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123") - defaults.put(KafkaConfig.OffsetsTopicCompressionCodecProp, SnappyCompressionCodec.codec.toString) - - val config = KafkaConfig.fromProps(defaults) - Assert.assertEquals("127.0.0.1:2181", config.zkConnect) - Assert.assertEquals(1234, config.zkConnectionTimeoutMs) - Assert.assertEquals(1, config.maxReservedBrokerId) - Assert.assertEquals(1, config.brokerId) - Assert.assertEquals("127.0.0.1", config.hostName) - Assert.assertEquals(1122, config.advertisedPort) - Assert.assertEquals("127.0.0.1", config.advertisedHostName) - Assert.assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides) - Assert.assertEquals(List("/tmp1", "/tmp2"), config.logDirs) - Assert.assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis) - Assert.assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis) - Assert.assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis) - Assert.assertEquals(123L, config.logFlushIntervalMs) - Assert.assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec) - } - - private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) { - values.foreach((value) => { - val props = validRequiredProps - props.setProperty(name, value.toString) - intercept[Exception] { - KafkaConfig.fromProps(props) - } - }) - } - - private def randFrom[T](choices: T*): T = { - import scala.util.Random - choices(Random.nextInt(choices.size)) - } - - private def randFrom[T](choices: List[T]): T = { - import scala.util.Random - choices(Random.nextInt(choices.size)) - } -} diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 2428dbd..0aea56d 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -21,12 +21,15 @@ import java.util.Properties import junit.framework.Assert._ import kafka.api.{ApiVersion, KAFKA_082} +import kafka.message.{SnappyCompressionCodec, Message} import kafka.utils.{TestUtils, CoreUtils} import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.protocol.SecurityProtocol -import org.junit.Test +import org.junit.{Assert, Test} import org.scalatest.junit.JUnit3Suite +import scala.util.Random._ + class KafkaConfigTest extends JUnit3Suite { @Test @@ -348,4 +351,275 @@ class KafkaConfigTest extends JUnit3Suite { KafkaConfig.fromProps(props) } } + + @Test + def testFromPropsEmpty() { + // only required + val p = new Properties() + p.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + val actualConfig = KafkaConfig.fromProps(p) + + val expectedConfig = new KafkaConfig(p) + + Assert.assertEquals(expectedConfig.zkConnect, actualConfig.zkConnect) + Assert.assertEquals(expectedConfig.zkSessionTimeoutMs, actualConfig.zkSessionTimeoutMs) + Assert.assertEquals(expectedConfig.zkConnectionTimeoutMs, actualConfig.zkConnectionTimeoutMs) + Assert.assertEquals(expectedConfig.zkSyncTimeMs, actualConfig.zkSyncTimeMs) + Assert.assertEquals(expectedConfig.maxReservedBrokerId, actualConfig.maxReservedBrokerId) + Assert.assertEquals(expectedConfig.brokerId, actualConfig.brokerId) + Assert.assertEquals(expectedConfig.messageMaxBytes, actualConfig.messageMaxBytes) + Assert.assertEquals(expectedConfig.numNetworkThreads, actualConfig.numNetworkThreads) + Assert.assertEquals(expectedConfig.numIoThreads, actualConfig.numIoThreads) + Assert.assertEquals(expectedConfig.backgroundThreads, actualConfig.backgroundThreads) + Assert.assertEquals(expectedConfig.queuedMaxRequests, actualConfig.queuedMaxRequests) + + Assert.assertEquals(expectedConfig.port, actualConfig.port) + Assert.assertEquals(expectedConfig.hostName, actualConfig.hostName) + Assert.assertEquals(expectedConfig.advertisedHostName, actualConfig.advertisedHostName) + Assert.assertEquals(expectedConfig.advertisedPort, actualConfig.advertisedPort) + Assert.assertEquals(expectedConfig.socketSendBufferBytes, actualConfig.socketSendBufferBytes) + Assert.assertEquals(expectedConfig.socketReceiveBufferBytes, actualConfig.socketReceiveBufferBytes) + Assert.assertEquals(expectedConfig.socketRequestMaxBytes, actualConfig.socketRequestMaxBytes) + Assert.assertEquals(expectedConfig.maxConnectionsPerIp, actualConfig.maxConnectionsPerIp) + Assert.assertEquals(expectedConfig.maxConnectionsPerIpOverrides, actualConfig.maxConnectionsPerIpOverrides) + Assert.assertEquals(expectedConfig.connectionsMaxIdleMs, actualConfig.connectionsMaxIdleMs) + + Assert.assertEquals(expectedConfig.numPartitions, actualConfig.numPartitions) + Assert.assertEquals(expectedConfig.logDirs, actualConfig.logDirs) + + Assert.assertEquals(expectedConfig.logSegmentBytes, actualConfig.logSegmentBytes) + + Assert.assertEquals(expectedConfig.logRollTimeMillis, actualConfig.logRollTimeMillis) + Assert.assertEquals(expectedConfig.logRollTimeJitterMillis, actualConfig.logRollTimeJitterMillis) + Assert.assertEquals(expectedConfig.logRetentionTimeMillis, actualConfig.logRetentionTimeMillis) + + Assert.assertEquals(expectedConfig.logRetentionBytes, actualConfig.logRetentionBytes) + Assert.assertEquals(expectedConfig.logCleanupIntervalMs, actualConfig.logCleanupIntervalMs) + Assert.assertEquals(expectedConfig.logCleanupPolicy, actualConfig.logCleanupPolicy) + Assert.assertEquals(expectedConfig.logCleanerThreads, actualConfig.logCleanerThreads) + Assert.assertEquals(expectedConfig.logCleanerIoMaxBytesPerSecond, actualConfig.logCleanerIoMaxBytesPerSecond, 0.0) + Assert.assertEquals(expectedConfig.logCleanerDedupeBufferSize, actualConfig.logCleanerDedupeBufferSize) + Assert.assertEquals(expectedConfig.logCleanerIoBufferSize, actualConfig.logCleanerIoBufferSize) + Assert.assertEquals(expectedConfig.logCleanerDedupeBufferLoadFactor, actualConfig.logCleanerDedupeBufferLoadFactor, 0.0) + Assert.assertEquals(expectedConfig.logCleanerBackoffMs, actualConfig.logCleanerBackoffMs) + Assert.assertEquals(expectedConfig.logCleanerMinCleanRatio, actualConfig.logCleanerMinCleanRatio, 0.0) + Assert.assertEquals(expectedConfig.logCleanerEnable, actualConfig.logCleanerEnable) + Assert.assertEquals(expectedConfig.logCleanerDeleteRetentionMs, actualConfig.logCleanerDeleteRetentionMs) + Assert.assertEquals(expectedConfig.logIndexSizeMaxBytes, actualConfig.logIndexSizeMaxBytes) + Assert.assertEquals(expectedConfig.logIndexIntervalBytes, actualConfig.logIndexIntervalBytes) + Assert.assertEquals(expectedConfig.logFlushIntervalMessages, actualConfig.logFlushIntervalMessages) + Assert.assertEquals(expectedConfig.logDeleteDelayMs, actualConfig.logDeleteDelayMs) + Assert.assertEquals(expectedConfig.logFlushSchedulerIntervalMs, actualConfig.logFlushSchedulerIntervalMs) + Assert.assertEquals(expectedConfig.logFlushIntervalMs, actualConfig.logFlushIntervalMs) + Assert.assertEquals(expectedConfig.logFlushOffsetCheckpointIntervalMs, actualConfig.logFlushOffsetCheckpointIntervalMs) + Assert.assertEquals(expectedConfig.numRecoveryThreadsPerDataDir, actualConfig.numRecoveryThreadsPerDataDir) + Assert.assertEquals(expectedConfig.autoCreateTopicsEnable, actualConfig.autoCreateTopicsEnable) + + Assert.assertEquals(expectedConfig.minInSyncReplicas, actualConfig.minInSyncReplicas) + + Assert.assertEquals(expectedConfig.controllerSocketTimeoutMs, actualConfig.controllerSocketTimeoutMs) + Assert.assertEquals(expectedConfig.defaultReplicationFactor, actualConfig.defaultReplicationFactor) + Assert.assertEquals(expectedConfig.replicaLagTimeMaxMs, actualConfig.replicaLagTimeMaxMs) + Assert.assertEquals(expectedConfig.replicaSocketTimeoutMs, actualConfig.replicaSocketTimeoutMs) + Assert.assertEquals(expectedConfig.replicaSocketReceiveBufferBytes, actualConfig.replicaSocketReceiveBufferBytes) + Assert.assertEquals(expectedConfig.replicaFetchMaxBytes, actualConfig.replicaFetchMaxBytes) + Assert.assertEquals(expectedConfig.replicaFetchWaitMaxMs, actualConfig.replicaFetchWaitMaxMs) + Assert.assertEquals(expectedConfig.replicaFetchMinBytes, actualConfig.replicaFetchMinBytes) + Assert.assertEquals(expectedConfig.replicaFetchBackoffMs, actualConfig.replicaFetchBackoffMs) + Assert.assertEquals(expectedConfig.numReplicaFetchers, actualConfig.numReplicaFetchers) + Assert.assertEquals(expectedConfig.replicaHighWatermarkCheckpointIntervalMs, actualConfig.replicaHighWatermarkCheckpointIntervalMs) + Assert.assertEquals(expectedConfig.fetchPurgatoryPurgeIntervalRequests, actualConfig.fetchPurgatoryPurgeIntervalRequests) + Assert.assertEquals(expectedConfig.producerPurgatoryPurgeIntervalRequests, actualConfig.producerPurgatoryPurgeIntervalRequests) + Assert.assertEquals(expectedConfig.autoLeaderRebalanceEnable, actualConfig.autoLeaderRebalanceEnable) + Assert.assertEquals(expectedConfig.leaderImbalancePerBrokerPercentage, actualConfig.leaderImbalancePerBrokerPercentage) + Assert.assertEquals(expectedConfig.leaderImbalanceCheckIntervalSeconds, actualConfig.leaderImbalanceCheckIntervalSeconds) + Assert.assertEquals(expectedConfig.uncleanLeaderElectionEnable, actualConfig.uncleanLeaderElectionEnable) + + Assert.assertEquals(expectedConfig.controlledShutdownMaxRetries, actualConfig.controlledShutdownMaxRetries) + Assert.assertEquals(expectedConfig.controlledShutdownRetryBackoffMs, actualConfig.controlledShutdownRetryBackoffMs) + Assert.assertEquals(expectedConfig.controlledShutdownEnable, actualConfig.controlledShutdownEnable) + + Assert.assertEquals(expectedConfig.consumerMinSessionTimeoutMs, actualConfig.consumerMinSessionTimeoutMs) + Assert.assertEquals(expectedConfig.consumerMaxSessionTimeoutMs, actualConfig.consumerMaxSessionTimeoutMs) + + Assert.assertEquals(expectedConfig.offsetMetadataMaxSize, actualConfig.offsetMetadataMaxSize) + Assert.assertEquals(expectedConfig.offsetsLoadBufferSize, actualConfig.offsetsLoadBufferSize) + Assert.assertEquals(expectedConfig.offsetsTopicReplicationFactor, actualConfig.offsetsTopicReplicationFactor) + Assert.assertEquals(expectedConfig.offsetsTopicPartitions, actualConfig.offsetsTopicPartitions) + Assert.assertEquals(expectedConfig.offsetsTopicSegmentBytes, actualConfig.offsetsTopicSegmentBytes) + Assert.assertEquals(expectedConfig.offsetsTopicCompressionCodec, actualConfig.offsetsTopicCompressionCodec) + Assert.assertEquals(expectedConfig.offsetsRetentionMinutes, actualConfig.offsetsRetentionMinutes) + Assert.assertEquals(expectedConfig.offsetsRetentionCheckIntervalMs, actualConfig.offsetsRetentionCheckIntervalMs) + Assert.assertEquals(expectedConfig.offsetCommitTimeoutMs, actualConfig.offsetCommitTimeoutMs) + Assert.assertEquals(expectedConfig.offsetCommitRequiredAcks, actualConfig.offsetCommitRequiredAcks) + + Assert.assertEquals(expectedConfig.deleteTopicEnable, actualConfig.deleteTopicEnable) + Assert.assertEquals(expectedConfig.compressionType, actualConfig.compressionType) + } + + private def atLeastXIntProp(x: Int): String = (nextInt(Int.MaxValue - 1) + x).toString + + private def atLeastOneIntProp: String = atLeastXIntProp(1) + + private def inRangeIntProp(fromInc: Int, toInc: Int): String = (nextInt(toInc + 1 - fromInc) + fromInc).toString + + @Test + def testFromPropsInvalid() { + def getBaseProperties(): Properties = { + val validRequiredProperties = new Properties() + validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1") + validRequiredProperties + } + // to ensure a basis is valid - bootstraps all needed validation + KafkaConfig.fromProps(getBaseProperties()) + + KafkaConfig.configNames().foreach(name => { + name match { + case KafkaConfig.ZkConnectProp => // ignore string + case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + + case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + + case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.HostNameProp => // ignore string + case KafkaConfig.AdvertisedHostNameProp => //ignore string + case KafkaConfig.AdvertisedPortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.SocketSendBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.SocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.MaxConnectionsPerIpOverridesProp => + assertPropertyInvalid(getBaseProperties(), name, "127.0.0.1:not_a_number") + case KafkaConfig.ConnectionsMaxIdleMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + + case KafkaConfig.NumPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.LogDirsProp => // ignore string + case KafkaConfig.LogDirProp => // ignore string + case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", Message.MinHeaderSize - 1) + + case KafkaConfig.LogRollTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.LogRollTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + + case KafkaConfig.LogRetentionTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.LogRetentionTimeMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.LogRetentionTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + + case KafkaConfig.LogRetentionBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.LogCleanupIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.LogCleanupPolicyProp => assertPropertyInvalid(getBaseProperties(), name, "unknown_policy", "0") + case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.LogCleanerDedupeBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "1024") + case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean") + case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "3") + case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") + case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.ControllerSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2") + case KafkaConfig.ReplicaSocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ReplicaFetchMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.AutoLeaderRebalanceEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") + case KafkaConfig.LeaderImbalancePerBrokerPercentageProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.UncleanLeaderElectionEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") + case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") + case KafkaConfig.ConsumerMinSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ConsumerMaxSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.OffsetsTopicPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.OffsetsTopicSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.OffsetsTopicCompressionCodecProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1") + case KafkaConfig.OffsetsRetentionMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2") + + case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") + + case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0") + case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0") + case KafkaConfig.MetricReporterClassesProp => // ignore string + + case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1") + } + }) + } + + @Test + def testSpecificProperties(): Unit = { + val defaults = new Properties() + defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + // For ZkConnectionTimeoutMs + defaults.put(KafkaConfig.ZkSessionTimeoutMsProp, "1234") + defaults.put(KafkaConfig.MaxReservedBrokerIdProp, "1") + defaults.put(KafkaConfig.BrokerIdProp, "1") + defaults.put(KafkaConfig.HostNameProp, "127.0.0.1") + defaults.put(KafkaConfig.PortProp, "1122") + defaults.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3") + defaults.put(KafkaConfig.LogDirProp, "/tmp1,/tmp2") + defaults.put(KafkaConfig.LogRollTimeHoursProp, "12") + defaults.put(KafkaConfig.LogRollTimeJitterHoursProp, "11") + defaults.put(KafkaConfig.LogRetentionTimeHoursProp, "10") + //For LogFlushIntervalMsProp + defaults.put(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123") + defaults.put(KafkaConfig.OffsetsTopicCompressionCodecProp, SnappyCompressionCodec.codec.toString) + + val config = KafkaConfig.fromProps(defaults) + Assert.assertEquals("127.0.0.1:2181", config.zkConnect) + Assert.assertEquals(1234, config.zkConnectionTimeoutMs) + Assert.assertEquals(1, config.maxReservedBrokerId) + Assert.assertEquals(1, config.brokerId) + Assert.assertEquals("127.0.0.1", config.hostName) + Assert.assertEquals(1122, config.advertisedPort) + Assert.assertEquals("127.0.0.1", config.advertisedHostName) + Assert.assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides) + Assert.assertEquals(List("/tmp1", "/tmp2"), config.logDirs) + Assert.assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis) + Assert.assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis) + Assert.assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis) + Assert.assertEquals(123L, config.logFlushIntervalMs) + Assert.assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec) + } + + private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) { + values.foreach((value) => { + val props = validRequiredProps + props.setProperty(name, value.toString) + intercept[Exception] { + KafkaConfig.fromProps(props) + } + }) + } + + private def randFrom[T](choices: T*): T = { + import scala.util.Random + choices(Random.nextInt(choices.size)) + } + + private def randFrom[T](choices: List[T]): T = { + import scala.util.Random + choices(Random.nextInt(choices.size)) + } } -- 2.3.2 (Apple Git-55)