From dde1a72657a6028f1afa161b844fd4a2469fa254 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Wed, 10 Jun 2015 23:09:10 -0700 Subject: [PATCH] modified KafkaConfig to implement AbstractConfig. This resulted in somewhat cleaner code, and we preserve the original Properties for use by MetricReporter --- .../apache/kafka/common/config/AbstractConfig.java | 10 +- .../scala/kafka/controller/KafkaController.scala | 2 +- core/src/main/scala/kafka/server/KafkaApis.scala | 4 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 565 ++++++--------------- .../scala/kafka/server/ReplicaFetcherThread.scala | 2 +- .../kafka/server/KafkaConfigConfigDefTest.scala | 18 +- 6 files changed, 155 insertions(+), 446 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index c4fa058..bcad3cd 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -57,15 +57,19 @@ public class AbstractConfig { return values.get(key); } - public int getInt(String key) { + public Short getShort(String key) { + return (Short) get(key); + } + + public Integer getInt(String key) { return (Integer) get(key); } - public long getLong(String key) { + public Long getLong(String key) { return (Long) get(key); } - public double getDouble(String key) { + public Double getDouble(String key) { return (Double) get(key); } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 69bba24..98711d2 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -325,7 +325,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt info("starting the partition rebalance scheduler") autoRebalanceScheduler.startup() autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, - 5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS) + 5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS) } deleteTopicManager.start() } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d63bc18..0a449a5 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -428,9 +428,9 @@ class KafkaApis(val requestChannel: RequestChannel, val aliveBrokers = metadataCache.getAliveBrokers val offsetsTopicReplicationFactor = if (aliveBrokers.length > 0) - Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length) + Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length) else - config.offsetsTopicReplicationFactor + config.offsetsTopicReplicationFactor.toInt AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor, offsetManager.offsetsTopicConfig) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 2d75186..7a0a509 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -26,7 +26,7 @@ import kafka.consumer.ConsumerConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} import kafka.utils.CoreUtils import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef} import org.apache.kafka.common.metrics.MetricsReporter import org.apache.kafka.common.protocol.SecurityProtocol import scala.collection.{mutable, immutable, JavaConversions, Map} @@ -482,14 +482,14 @@ object KafkaConfig { .define(ProducerPurgatoryPurgeIntervalRequestsProp, INT, Defaults.ProducerPurgatoryPurgeIntervalRequests, MEDIUM, ProducerPurgatoryPurgeIntervalRequestsDoc) .define(AutoLeaderRebalanceEnableProp, BOOLEAN, Defaults.AutoLeaderRebalanceEnable, HIGH, AutoLeaderRebalanceEnableDoc) .define(LeaderImbalancePerBrokerPercentageProp, INT, Defaults.LeaderImbalancePerBrokerPercentage, HIGH, LeaderImbalancePerBrokerPercentageDoc) - .define(LeaderImbalanceCheckIntervalSecondsProp, INT, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc) + .define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc) .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc) .define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc) .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc) /** ********* Controlled shutdown configuration ***********/ .define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc) - .define(ControlledShutdownRetryBackoffMsProp, INT, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc) + .define(ControlledShutdownRetryBackoffMsProp, LONG, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc) .define(ControlledShutdownEnableProp, BOOLEAN, Defaults.ControlledShutdownEnable, MEDIUM, ControlledShutdownEnableDoc) /** ********* Consumer coordinator configuration ***********/ @@ -520,139 +520,6 @@ object KafkaConfig { } /** - * Parse the given properties instance into a KafkaConfig object - */ - def fromProps(props: Properties): KafkaConfig = { - import kafka.utils.CoreUtils.evaluateDefaults - val parsed = configDef.parse(evaluateDefaults(props)) - new KafkaConfig( - /** ********* Zookeeper Configuration ***********/ - zkConnect = parsed.get(ZkConnectProp).asInstanceOf[String], - zkSessionTimeoutMs = parsed.get(ZkSessionTimeoutMsProp).asInstanceOf[Int], - _zkConnectionTimeoutMs = Option(parsed.get(ZkConnectionTimeoutMsProp)).map(_.asInstanceOf[Int]), - zkSyncTimeMs = parsed.get(ZkSyncTimeMsProp).asInstanceOf[Int], - - /** ********* General Configuration ***********/ - maxReservedBrokerId = parsed.get(MaxReservedBrokerIdProp).asInstanceOf[Int], - brokerId = parsed.get(BrokerIdProp).asInstanceOf[Int], - messageMaxBytes = parsed.get(MessageMaxBytesProp).asInstanceOf[Int], - numNetworkThreads = parsed.get(NumNetworkThreadsProp).asInstanceOf[Int], - numIoThreads = parsed.get(NumIoThreadsProp).asInstanceOf[Int], - backgroundThreads = parsed.get(BackgroundThreadsProp).asInstanceOf[Int], - queuedMaxRequests = parsed.get(QueuedMaxRequestsProp).asInstanceOf[Int], - - /** ********* Socket Server Configuration ***********/ - port = parsed.get(PortProp).asInstanceOf[Int], - hostName = parsed.get(HostNameProp).asInstanceOf[String], - _listeners = Option(parsed.get(ListenersProp)).map(_.asInstanceOf[String]), - _advertisedHostName = Option(parsed.get(AdvertisedHostNameProp)).map(_.asInstanceOf[String]), - _advertisedPort = Option(parsed.get(AdvertisedPortProp)).map(_.asInstanceOf[Int]), - _advertisedListeners = Option(parsed.get(AdvertisedListenersProp)).map(_.asInstanceOf[String]), - socketSendBufferBytes = parsed.get(SocketSendBufferBytesProp).asInstanceOf[Int], - socketReceiveBufferBytes = parsed.get(SocketReceiveBufferBytesProp).asInstanceOf[Int], - socketRequestMaxBytes = parsed.get(SocketRequestMaxBytesProp).asInstanceOf[Int], - maxConnectionsPerIp = parsed.get(MaxConnectionsPerIpProp).asInstanceOf[Int], - _maxConnectionsPerIpOverrides = parsed.get(MaxConnectionsPerIpOverridesProp).asInstanceOf[String], - connectionsMaxIdleMs = parsed.get(ConnectionsMaxIdleMsProp).asInstanceOf[Long], - - /** ********* Log Configuration ***********/ - numPartitions = parsed.get(NumPartitionsProp).asInstanceOf[Int], - _logDir = parsed.get(LogDirProp).asInstanceOf[String], - _logDirs = Option(parsed.get(LogDirsProp)).map(_.asInstanceOf[String]), - - logSegmentBytes = parsed.get(LogSegmentBytesProp).asInstanceOf[Int], - logRollTimeHours = parsed.get(LogRollTimeHoursProp).asInstanceOf[Int], - _logRollTimeMillis = Option(parsed.get(LogRollTimeMillisProp)).map(_.asInstanceOf[Long]), - - logRollTimeJitterHours = parsed.get(LogRollTimeJitterHoursProp).asInstanceOf[Int], - _logRollTimeJitterMillis = Option(parsed.get(LogRollTimeJitterMillisProp)).map(_.asInstanceOf[Long]), - - logRetentionTimeHours = parsed.get(LogRetentionTimeHoursProp).asInstanceOf[Int], - _logRetentionTimeMins = Option(parsed.get(LogRetentionTimeMinutesProp)).map(_.asInstanceOf[Int]), - _logRetentionTimeMillis = Option(parsed.get(LogRetentionTimeMillisProp)).map(_.asInstanceOf[Long]), - - logRetentionBytes = parsed.get(LogRetentionBytesProp).asInstanceOf[Long], - logCleanupIntervalMs = parsed.get(LogCleanupIntervalMsProp).asInstanceOf[Long], - logCleanupPolicy = parsed.get(LogCleanupPolicyProp).asInstanceOf[String], - logCleanerThreads = parsed.get(LogCleanerThreadsProp).asInstanceOf[Int], - logCleanerIoMaxBytesPerSecond = parsed.get(LogCleanerIoMaxBytesPerSecondProp).asInstanceOf[Double], - logCleanerDedupeBufferSize = parsed.get(LogCleanerDedupeBufferSizeProp).asInstanceOf[Long], - logCleanerIoBufferSize = parsed.get(LogCleanerIoBufferSizeProp).asInstanceOf[Int], - logCleanerDedupeBufferLoadFactor = parsed.get(LogCleanerDedupeBufferLoadFactorProp).asInstanceOf[Double], - logCleanerBackoffMs = parsed.get(LogCleanerBackoffMsProp).asInstanceOf[Long], - logCleanerMinCleanRatio = parsed.get(LogCleanerMinCleanRatioProp).asInstanceOf[Double], - logCleanerEnable = parsed.get(LogCleanerEnableProp).asInstanceOf[Boolean], - logCleanerDeleteRetentionMs = parsed.get(LogCleanerDeleteRetentionMsProp).asInstanceOf[Long], - logIndexSizeMaxBytes = parsed.get(LogIndexSizeMaxBytesProp).asInstanceOf[Int], - logIndexIntervalBytes = parsed.get(LogIndexIntervalBytesProp).asInstanceOf[Int], - logFlushIntervalMessages = parsed.get(LogFlushIntervalMessagesProp).asInstanceOf[Long], - logDeleteDelayMs = parsed.get(LogDeleteDelayMsProp).asInstanceOf[Long], - logFlushSchedulerIntervalMs = parsed.get(LogFlushSchedulerIntervalMsProp).asInstanceOf[Long], - _logFlushIntervalMs = Option(parsed.get(LogFlushIntervalMsProp)).map(_.asInstanceOf[Long]), - logFlushOffsetCheckpointIntervalMs = parsed.get(LogFlushOffsetCheckpointIntervalMsProp).asInstanceOf[Int], - numRecoveryThreadsPerDataDir = parsed.get(NumRecoveryThreadsPerDataDirProp).asInstanceOf[Int], - autoCreateTopicsEnable = parsed.get(AutoCreateTopicsEnableProp).asInstanceOf[Boolean], - minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int], - - /** ********* Replication configuration ***********/ - controllerSocketTimeoutMs = parsed.get(ControllerSocketTimeoutMsProp).asInstanceOf[Int], - defaultReplicationFactor = parsed.get(DefaultReplicationFactorProp).asInstanceOf[Int], - replicaLagTimeMaxMs = parsed.get(ReplicaLagTimeMaxMsProp).asInstanceOf[Long], - replicaSocketTimeoutMs = parsed.get(ReplicaSocketTimeoutMsProp).asInstanceOf[Int], - replicaSocketReceiveBufferBytes = parsed.get(ReplicaSocketReceiveBufferBytesProp).asInstanceOf[Int], - replicaFetchMaxBytes = parsed.get(ReplicaFetchMaxBytesProp).asInstanceOf[Int], - replicaFetchWaitMaxMs = parsed.get(ReplicaFetchWaitMaxMsProp).asInstanceOf[Int], - replicaFetchMinBytes = parsed.get(ReplicaFetchMinBytesProp).asInstanceOf[Int], - replicaFetchBackoffMs = parsed.get(ReplicaFetchBackoffMsProp).asInstanceOf[Int], - numReplicaFetchers = parsed.get(NumReplicaFetchersProp).asInstanceOf[Int], - replicaHighWatermarkCheckpointIntervalMs = parsed.get(ReplicaHighWatermarkCheckpointIntervalMsProp).asInstanceOf[Long], - fetchPurgatoryPurgeIntervalRequests = parsed.get(FetchPurgatoryPurgeIntervalRequestsProp).asInstanceOf[Int], - producerPurgatoryPurgeIntervalRequests = parsed.get(ProducerPurgatoryPurgeIntervalRequestsProp).asInstanceOf[Int], - autoLeaderRebalanceEnable = parsed.get(AutoLeaderRebalanceEnableProp).asInstanceOf[Boolean], - leaderImbalancePerBrokerPercentage = parsed.get(LeaderImbalancePerBrokerPercentageProp).asInstanceOf[Int], - leaderImbalanceCheckIntervalSeconds = parsed.get(LeaderImbalanceCheckIntervalSecondsProp).asInstanceOf[Int], - uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean], - interBrokerSecurityProtocol = SecurityProtocol.valueOf(parsed.get(InterBrokerSecurityProtocolProp).asInstanceOf[String]), - interBrokerProtocolVersion = ApiVersion(parsed.get(InterBrokerProtocolVersionProp).asInstanceOf[String]), - - /** ********* Controlled shutdown configuration ***********/ - controlledShutdownMaxRetries = parsed.get(ControlledShutdownMaxRetriesProp).asInstanceOf[Int], - controlledShutdownRetryBackoffMs = parsed.get(ControlledShutdownRetryBackoffMsProp).asInstanceOf[Int], - controlledShutdownEnable = parsed.get(ControlledShutdownEnableProp).asInstanceOf[Boolean], - - /** ********* Consumer coordinator configuration ***********/ - consumerMinSessionTimeoutMs = parsed.get(ConsumerMinSessionTimeoutMsProp).asInstanceOf[Int], - consumerMaxSessionTimeoutMs = parsed.get(ConsumerMaxSessionTimeoutMsProp).asInstanceOf[Int], - - /** ********* Offset management configuration ***********/ - offsetMetadataMaxSize = parsed.get(OffsetMetadataMaxSizeProp).asInstanceOf[Int], - offsetsLoadBufferSize = parsed.get(OffsetsLoadBufferSizeProp).asInstanceOf[Int], - offsetsTopicReplicationFactor = parsed.get(OffsetsTopicReplicationFactorProp).asInstanceOf[Short], - offsetsTopicPartitions = parsed.get(OffsetsTopicPartitionsProp).asInstanceOf[Int], - offsetsTopicSegmentBytes = parsed.get(OffsetsTopicSegmentBytesProp).asInstanceOf[Int], - offsetsTopicCompressionCodec = Option(parsed.get(OffsetsTopicCompressionCodecProp)).map(_.asInstanceOf[Int]).map(value => CompressionCodec.getCompressionCodec(value)).orNull, - offsetsRetentionMinutes = parsed.get(OffsetsRetentionMinutesProp).asInstanceOf[Int], - offsetsRetentionCheckIntervalMs = parsed.get(OffsetsRetentionCheckIntervalMsProp).asInstanceOf[Long], - offsetCommitTimeoutMs = parsed.get(OffsetCommitTimeoutMsProp).asInstanceOf[Int], - offsetCommitRequiredAcks = parsed.get(OffsetCommitRequiredAcksProp).asInstanceOf[Short], - deleteTopicEnable = parsed.get(DeleteTopicEnableProp).asInstanceOf[Boolean], - compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String], - metricNumSamples = parsed.get(MetricNumSamplesProp).asInstanceOf[Int], - metricSampleWindowMs = parsed.get(MetricSampleWindowMsProp).asInstanceOf[Long], - _metricReporterClasses = parsed.get(MetricReporterClassesProp).asInstanceOf[java.util.List[String]] - ) - } - - /** - * Create a log config instance using the given properties and defaults - */ - def fromProps(defaults: Properties, overrides: Properties): KafkaConfig = { - val props = new Properties(defaults) - props.putAll(overrides) - fromProps(props) - } - - /** * Check that property names are valid */ def validateNames(props: Properties) { @@ -662,171 +529,149 @@ object KafkaConfig { require(names.contains(name), "Unknown configuration \"%s\".".format(name)) } - /** - * Check that the given properties contain only valid kafka config names and that all values can be parsed and are valid - */ - def validate(props: Properties) { - validateNames(props) - configDef.parse(props) + def fromProps(props: Properties): KafkaConfig = { + KafkaConfig(props) + } - // to bootstrap KafkaConfig.validateValues() - KafkaConfig.fromProps(props) + def fromProps(defaults: Properties, overrides: Properties): KafkaConfig = { + val props = new Properties() + props.putAll(defaults) + props.putAll(overrides) + fromProps(props) } + } -class KafkaConfig (/** ********* Zookeeper Configuration ***********/ - val zkConnect: String, - val zkSessionTimeoutMs: Int = Defaults.ZkSessionTimeoutMs, - private val _zkConnectionTimeoutMs: Option[Int] = None, - val zkSyncTimeMs: Int = Defaults.ZkSyncTimeMs, - - /** ********* General Configuration ***********/ - val maxReservedBrokerId: Int = Defaults.MaxReservedBrokerId, - var brokerId: Int = Defaults.BrokerId, - val messageMaxBytes: Int = Defaults.MessageMaxBytes, - val numNetworkThreads: Int = Defaults.NumNetworkThreads, - val numIoThreads: Int = Defaults.NumIoThreads, - val backgroundThreads: Int = Defaults.BackgroundThreads, - val queuedMaxRequests: Int = Defaults.QueuedMaxRequests, - - /** ********* Socket Server Configuration ***********/ - val port: Int = Defaults.Port, - val hostName: String = Defaults.HostName, - private val _listeners: Option[String] = None, - private val _advertisedHostName: Option[String] = None, - private val _advertisedPort: Option[Int] = None, - private val _advertisedListeners: Option[String] = None, - val socketSendBufferBytes: Int = Defaults.SocketSendBufferBytes, - val socketReceiveBufferBytes: Int = Defaults.SocketReceiveBufferBytes, - val socketRequestMaxBytes: Int = Defaults.SocketRequestMaxBytes, - val maxConnectionsPerIp: Int = Defaults.MaxConnectionsPerIp, - private val _maxConnectionsPerIpOverrides: String = Defaults.MaxConnectionsPerIpOverrides, - val connectionsMaxIdleMs: Long = Defaults.ConnectionsMaxIdleMs, - - /** ********* Log Configuration ***********/ - val numPartitions: Int = Defaults.NumPartitions, - private val _logDir: String = Defaults.LogDir, - private val _logDirs: Option[String] = None, - - val logSegmentBytes: Int = Defaults.LogSegmentBytes, - - val logRollTimeHours: Int = Defaults.LogRollHours, - private val _logRollTimeMillis: Option[Long] = None, - - val logRollTimeJitterHours: Int = Defaults.LogRollJitterHours, - private val _logRollTimeJitterMillis: Option[Long] = None, - - val logRetentionTimeHours: Int = Defaults.LogRetentionHours, - private val _logRetentionTimeMins: Option[Int] = None, - private val _logRetentionTimeMillis: Option[Long] = None, - - val logRetentionBytes: Long = Defaults.LogRetentionBytes, - val logCleanupIntervalMs: Long = Defaults.LogCleanupIntervalMs, - val logCleanupPolicy: String = Defaults.LogCleanupPolicy, - val logCleanerThreads: Int = Defaults.LogCleanerThreads, - val logCleanerIoMaxBytesPerSecond: Double = Defaults.LogCleanerIoMaxBytesPerSecond, - val logCleanerDedupeBufferSize: Long = Defaults.LogCleanerDedupeBufferSize, - val logCleanerIoBufferSize: Int = Defaults.LogCleanerIoBufferSize, - val logCleanerDedupeBufferLoadFactor: Double = Defaults.LogCleanerDedupeBufferLoadFactor, - val logCleanerBackoffMs: Long = Defaults.LogCleanerBackoffMs, - val logCleanerMinCleanRatio: Double = Defaults.LogCleanerMinCleanRatio, - val logCleanerEnable: Boolean = Defaults.LogCleanerEnable, - val logCleanerDeleteRetentionMs: Long = Defaults.LogCleanerDeleteRetentionMs, - val logIndexSizeMaxBytes: Int = Defaults.LogIndexSizeMaxBytes, - val logIndexIntervalBytes: Int = Defaults.LogIndexIntervalBytes, - val logFlushIntervalMessages: Long = Defaults.LogFlushIntervalMessages, - val logDeleteDelayMs: Long = Defaults.LogDeleteDelayMs, - val logFlushSchedulerIntervalMs: Long = Defaults.LogFlushSchedulerIntervalMs, - private val _logFlushIntervalMs: Option[Long] = None, - val logFlushOffsetCheckpointIntervalMs: Int = Defaults.LogFlushOffsetCheckpointIntervalMs, - val numRecoveryThreadsPerDataDir: Int = Defaults.NumRecoveryThreadsPerDataDir, - val autoCreateTopicsEnable: Boolean = Defaults.AutoCreateTopicsEnable, - - val minInSyncReplicas: Int = Defaults.MinInSyncReplicas, - - /** ********* Replication configuration ***********/ - val controllerSocketTimeoutMs: Int = Defaults.ControllerSocketTimeoutMs, - val defaultReplicationFactor: Int = Defaults.DefaultReplicationFactor, - val replicaLagTimeMaxMs: Long = Defaults.ReplicaLagTimeMaxMs, - val replicaSocketTimeoutMs: Int = Defaults.ReplicaSocketTimeoutMs, - val replicaSocketReceiveBufferBytes: Int = Defaults.ReplicaSocketReceiveBufferBytes, - val replicaFetchMaxBytes: Int = Defaults.ReplicaFetchMaxBytes, - val replicaFetchWaitMaxMs: Int = Defaults.ReplicaFetchWaitMaxMs, - val replicaFetchMinBytes: Int = Defaults.ReplicaFetchMinBytes, - val replicaFetchBackoffMs: Int = Defaults.ReplicaFetchBackoffMs, - val numReplicaFetchers: Int = Defaults.NumReplicaFetchers, - val replicaHighWatermarkCheckpointIntervalMs: Long = Defaults.ReplicaHighWatermarkCheckpointIntervalMs, - val fetchPurgatoryPurgeIntervalRequests: Int = Defaults.FetchPurgatoryPurgeIntervalRequests, - val producerPurgatoryPurgeIntervalRequests: Int = Defaults.ProducerPurgatoryPurgeIntervalRequests, - val autoLeaderRebalanceEnable: Boolean = Defaults.AutoLeaderRebalanceEnable, - val leaderImbalancePerBrokerPercentage: Int = Defaults.LeaderImbalancePerBrokerPercentage, - val leaderImbalanceCheckIntervalSeconds: Int = Defaults.LeaderImbalanceCheckIntervalSeconds, - val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, - val interBrokerSecurityProtocol: SecurityProtocol = SecurityProtocol.valueOf(Defaults.InterBrokerSecurityProtocol), - val interBrokerProtocolVersion: ApiVersion = ApiVersion(Defaults.InterBrokerProtocolVersion), - - /** ********* Controlled shutdown configuration ***********/ - val controlledShutdownMaxRetries: Int = Defaults.ControlledShutdownMaxRetries, - val controlledShutdownRetryBackoffMs: Int = Defaults.ControlledShutdownRetryBackoffMs, - val controlledShutdownEnable: Boolean = Defaults.ControlledShutdownEnable, - - /** ********* Consumer coordinator configuration ***********/ - val consumerMinSessionTimeoutMs: Int = Defaults.ConsumerMinSessionTimeoutMs, - val consumerMaxSessionTimeoutMs: Int = Defaults.ConsumerMaxSessionTimeoutMs, - - /** ********* Offset management configuration ***********/ - val offsetMetadataMaxSize: Int = Defaults.OffsetMetadataMaxSize, - val offsetsLoadBufferSize: Int = Defaults.OffsetsLoadBufferSize, - val offsetsTopicReplicationFactor: Short = Defaults.OffsetsTopicReplicationFactor, - val offsetsTopicPartitions: Int = Defaults.OffsetsTopicPartitions, - val offsetsTopicSegmentBytes: Int = Defaults.OffsetsTopicSegmentBytes, - val offsetsTopicCompressionCodec: CompressionCodec = CompressionCodec.getCompressionCodec(Defaults.OffsetsTopicCompressionCodec), - val offsetsRetentionMinutes: Int = Defaults.OffsetsRetentionMinutes, - val offsetsRetentionCheckIntervalMs: Long = Defaults.OffsetsRetentionCheckIntervalMs, - val offsetCommitTimeoutMs: Int = Defaults.OffsetCommitTimeoutMs, - val offsetCommitRequiredAcks: Short = Defaults.OffsetCommitRequiredAcks, - - val deleteTopicEnable: Boolean = Defaults.DeleteTopicEnable, - val compressionType: String = Defaults.CompressionType, - - val metricSampleWindowMs: Long = Defaults.MetricSampleWindowMs, - val metricNumSamples: Int = Defaults.MetricNumSamples, - private val _metricReporterClasses: java.util.List[String] = util.Arrays.asList(Defaults.MetricReporterClasses) - ) { - - val zkConnectionTimeoutMs: Int = _zkConnectionTimeoutMs.getOrElse(zkSessionTimeoutMs) - - val listeners = getListeners() - val advertisedHostName: String = _advertisedHostName.getOrElse(hostName) - val advertisedPort: Int = _advertisedPort.getOrElse(port) - val advertisedListeners = getAdvertisedListeners() - val logDirs = CoreUtils.parseCsvList(_logDirs.getOrElse(_logDir)) - - val logRollTimeMillis = _logRollTimeMillis.getOrElse(60 * 60 * 1000L * logRollTimeHours) - val logRollTimeJitterMillis = _logRollTimeJitterMillis.getOrElse(60 * 60 * 1000L * logRollTimeJitterHours) - val logRetentionTimeMillis = getLogRetentionTimeMillis +case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(KafkaConfig.configDef, props) { - val logFlushIntervalMs = _logFlushIntervalMs.getOrElse(logFlushSchedulerIntervalMs) + /** ********* Zookeeper Configuration ***********/ + val zkConnect: String = getString(KafkaConfig.ZkConnectProp) + val zkSessionTimeoutMs: Int = getInt(KafkaConfig.ZkSessionTimeoutMsProp) + val zkConnectionTimeoutMs: java.lang.Integer = + Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp)) + val zkSyncTimeMs: Int = getInt(KafkaConfig.ZkSyncTimeMsProp) + /** ********* General Configuration ***********/ + val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp) + var brokerId: Int = getInt(KafkaConfig.BrokerIdProp) + val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp) + val backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp) + val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp) + val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp) + val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp) + + /** ********* Socket Server Configuration ***********/ + val hostName = getString(KafkaConfig.HostNameProp) + val port = getInt(KafkaConfig.PortProp) + val advertisedHostName = Option(getString(KafkaConfig.AdvertisedHostNameProp)).getOrElse(hostName) + val advertisedPort: java.lang.Integer = Option(getInt(KafkaConfig.AdvertisedPortProp)).getOrElse(port) + + val socketSendBufferBytes = getInt(KafkaConfig.SocketSendBufferBytesProp) + val socketReceiveBufferBytes = getInt(KafkaConfig.SocketReceiveBufferBytesProp) + val socketRequestMaxBytes = getInt(KafkaConfig.SocketRequestMaxBytesProp) + val maxConnectionsPerIp = getInt(KafkaConfig.MaxConnectionsPerIpProp) val maxConnectionsPerIpOverrides: Map[String, Int] = - getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, _maxConnectionsPerIpOverrides).map { case (k, v) => (k, v.toInt)} + getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => (k, v.toInt)} + val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp) + - val metricReporterClasses: java.util.List[MetricsReporter] = getMetricClasses(_metricReporterClasses) + /** ********* Log Configuration ***********/ + val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp) + val numPartitions = getInt(KafkaConfig.NumPartitionsProp) + val logDirs = CoreUtils.parseCsvList( Option(getString(KafkaConfig.LogDirsProp)).getOrElse(getString(KafkaConfig.LogDirProp))) + val logSegmentBytes = getInt(KafkaConfig.LogSegmentBytesProp) + val logFlushIntervalMessages = getLong(KafkaConfig.LogFlushIntervalMessagesProp) + val logCleanerThreads = getInt(KafkaConfig.LogCleanerThreadsProp) + val numRecoveryThreadsPerDataDir = getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp) + val logFlushSchedulerIntervalMs = getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp) + val logFlushOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong + val logCleanupIntervalMs = getLong(KafkaConfig.LogCleanupIntervalMsProp) + val logCleanupPolicy = getString(KafkaConfig.LogCleanupPolicyProp) + val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp) + val offsetsRetentionCheckIntervalMs = getLong(KafkaConfig.OffsetsRetentionCheckIntervalMsProp) + val logRetentionBytes = getLong(KafkaConfig.LogRetentionBytesProp) + val logCleanerDedupeBufferSize = getLong(KafkaConfig.LogCleanerDedupeBufferSizeProp) + val logCleanerDedupeBufferLoadFactor = getDouble(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp) + val logCleanerIoBufferSize = getInt(KafkaConfig.LogCleanerIoBufferSizeProp) + val logCleanerIoMaxBytesPerSecond = getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp) + val logCleanerDeleteRetentionMs = getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp) + val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp) + val logCleanerMinCleanRatio = getDouble(KafkaConfig.LogCleanerMinCleanRatioProp) + val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp) + val logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp) + val logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp) + val logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp) + val logRollTimeMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeHoursProp)) + val logRollTimeJitterMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp)) + val logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)) + val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp) + + /** ********* Replication configuration ***********/ + val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp) + val defaultReplicationFactor: Int = getInt(KafkaConfig.DefaultReplicationFactorProp) + val replicaLagTimeMaxMs = getLong(KafkaConfig.ReplicaLagTimeMaxMsProp) + val replicaSocketTimeoutMs = getInt(KafkaConfig.ReplicaSocketTimeoutMsProp) + val replicaSocketReceiveBufferBytes = getInt(KafkaConfig.ReplicaSocketReceiveBufferBytesProp) + val replicaFetchMaxBytes = getInt(KafkaConfig.ReplicaFetchMaxBytesProp) + val replicaFetchWaitMaxMs = getInt(KafkaConfig.ReplicaFetchWaitMaxMsProp) + val replicaFetchMinBytes = getInt(KafkaConfig.ReplicaFetchMinBytesProp) + val replicaFetchBackoffMs = getInt(KafkaConfig.ReplicaFetchBackoffMsProp) + val numReplicaFetchers = getInt(KafkaConfig.NumReplicaFetchersProp) + val replicaHighWatermarkCheckpointIntervalMs = getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp) + val fetchPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp) + val producerPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp) + val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp) + val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp) + val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp) + val uncleanLeaderElectionEnable = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp) + val interBrokerSecurityProtocol = SecurityProtocol.valueOf(getString(KafkaConfig.InterBrokerSecurityProtocolProp)) + val interBrokerProtocolVersion = ApiVersion(getString(KafkaConfig.InterBrokerProtocolVersionProp)) + + /** ********* Controlled shutdown configuration ***********/ + val controlledShutdownMaxRetries = getInt(KafkaConfig.ControlledShutdownMaxRetriesProp) + val controlledShutdownRetryBackoffMs = getLong(KafkaConfig.ControlledShutdownRetryBackoffMsProp) + val controlledShutdownEnable = getBoolean(KafkaConfig.ControlledShutdownEnableProp) + + /** ********* Consumer coordinator configuration ***********/ + val consumerMinSessionTimeoutMs = getInt(KafkaConfig.ConsumerMinSessionTimeoutMsProp) + val consumerMaxSessionTimeoutMs = getInt(KafkaConfig.ConsumerMaxSessionTimeoutMsProp) + + /** ********* Offset management configuration ***********/ + val offsetMetadataMaxSize = getInt(KafkaConfig.OffsetMetadataMaxSizeProp) + val offsetsLoadBufferSize = getInt(KafkaConfig.OffsetsLoadBufferSizeProp) + val offsetsTopicReplicationFactor = getShort(KafkaConfig.OffsetsTopicReplicationFactorProp) + val offsetsTopicPartitions = getInt(KafkaConfig.OffsetsTopicPartitionsProp) + val offsetCommitTimeoutMs = getInt(KafkaConfig.OffsetCommitTimeoutMsProp) + val offsetCommitRequiredAcks = getShort(KafkaConfig.OffsetCommitRequiredAcksProp) + val offsetsTopicSegmentBytes = getInt(KafkaConfig.OffsetsTopicSegmentBytesProp) + val offsetsTopicCompressionCodec = Option(getInt(KafkaConfig.OffsetsTopicCompressionCodecProp)).map(value => CompressionCodec.getCompressionCodec(value)).orNull + + /** ********* Metric Configuration **************/ + val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp) + val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp) + val metricReporterClasses: java.util.List[MetricsReporter] = getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter]) + + val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp) + val compressionType = getString(KafkaConfig.CompressionTypeProp) + + + val listeners = getListeners + val advertisedListeners = getAdvertisedListeners + val logRetentionTimeMillis = getLogRetentionTimeMillis private def getLogRetentionTimeMillis: Long = { val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute - val millis = { - _logRetentionTimeMillis.getOrElse( - _logRetentionTimeMins match { - case Some(mins) => millisInMinute * mins - case None => millisInHour * logRetentionTimeHours - } - ) - } - if (millis < 0) return -1 - millis + val millis: java.lang.Long = + Option(getLong(KafkaConfig.LogRetentionTimeMillisProp)).getOrElse( + Option(getInt(KafkaConfig.LogRetentionTimeMinutesProp)) match { + case Some(mins) => millisInMinute * mins + case None => getInt(KafkaConfig.LogRetentionTimeHoursProp) * millisInHour + }) + + if (millis < 0) return -1 + millis } private def getMap(propName: String, propValue: String): Map[String, String] = { @@ -855,9 +700,9 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/ // If the user did not define listeners but did define host or port, let's use them in backward compatible way // If none of those are defined, we default to PLAINTEXT://:9092 private def getListeners(): immutable.Map[SecurityProtocol, EndPoint] = { - if (_listeners.isDefined) { - validateUniquePortAndProtocol(_listeners.get) - CoreUtils.listenerListToEndPoints(_listeners.get) + if (getString(KafkaConfig.ListenersProp) != null) { + validateUniquePortAndProtocol(getString(KafkaConfig.ListenersProp)) + CoreUtils.listenerListToEndPoints(getString(KafkaConfig.ListenersProp)) } else { CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port) } @@ -867,11 +712,12 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/ // If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults // If none of these are defined, we'll use the listeners private def getAdvertisedListeners(): immutable.Map[SecurityProtocol, EndPoint] = { - if (_advertisedListeners.isDefined) { - validateUniquePortAndProtocol(_advertisedListeners.get) - CoreUtils.listenerListToEndPoints(_advertisedListeners.get) - } else if (_advertisedHostName.isDefined || _advertisedPort.isDefined ) { - CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort) + if (getString(KafkaConfig.AdvertisedListenersProp) != null) { + validateUniquePortAndProtocol(getString(KafkaConfig.AdvertisedListenersProp)) + CoreUtils.listenerListToEndPoints(getString(KafkaConfig.AdvertisedListenersProp)) + } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { + CoreUtils.listenerListToEndPoints("PLAINTEXT://" + + getString(KafkaConfig.AdvertisedHostNameProp) + ":" + getInt(KafkaConfig.AdvertisedPortProp)) } else { getListeners() } @@ -895,19 +741,13 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/ } - - validateValues() private def validateValues() { require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id") require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1") require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0") - - require(_logRetentionTimeMins.forall(_ >= 1)|| _logRetentionTimeMins.forall(_ .equals(-1)), "log.retention.minutes must be unlimited (-1) or, equal or greater than 1") - require(logRetentionTimeHours >= 1 || logRetentionTimeHours == -1, "log.retention.hours must be unlimited (-1) or, equal or greater than 1") require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1") - require(logDirs.size > 0) require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.") require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" + @@ -922,125 +762,6 @@ class KafkaConfig (/** ********* Zookeeper Configuration ***********/ } def toProps: Properties = { - val props = new Properties() - import kafka.server.KafkaConfig._ - /** ********* Zookeeper Configuration ***********/ - props.put(ZkConnectProp, zkConnect) - props.put(ZkSessionTimeoutMsProp, zkSessionTimeoutMs.toString) - _zkConnectionTimeoutMs.foreach(value => props.put(ZkConnectionTimeoutMsProp, value.toString)) - props.put(ZkSyncTimeMsProp, zkSyncTimeMs.toString) - - /** ********* General Configuration ***********/ - props.put(MaxReservedBrokerIdProp, maxReservedBrokerId.toString) - props.put(BrokerIdProp, brokerId.toString) - props.put(MessageMaxBytesProp, messageMaxBytes.toString) - props.put(NumNetworkThreadsProp, numNetworkThreads.toString) - props.put(NumIoThreadsProp, numIoThreads.toString) - props.put(BackgroundThreadsProp, backgroundThreads.toString) - props.put(QueuedMaxRequestsProp, queuedMaxRequests.toString) - - /** ********* Socket Server Configuration ***********/ - props.put(PortProp, port.toString) - props.put(HostNameProp, hostName) - _listeners.foreach(props.put(ListenersProp, _)) - _advertisedHostName.foreach(props.put(AdvertisedHostNameProp, _)) - _advertisedPort.foreach(value => props.put(AdvertisedPortProp, value.toString)) - _advertisedListeners.foreach(props.put(AdvertisedListenersProp, _)) - props.put(SocketSendBufferBytesProp, socketSendBufferBytes.toString) - props.put(SocketReceiveBufferBytesProp, socketReceiveBufferBytes.toString) - props.put(SocketRequestMaxBytesProp, socketRequestMaxBytes.toString) - props.put(MaxConnectionsPerIpProp, maxConnectionsPerIp.toString) - props.put(MaxConnectionsPerIpOverridesProp, _maxConnectionsPerIpOverrides) - props.put(ConnectionsMaxIdleMsProp, connectionsMaxIdleMs.toString) - - /** ********* Log Configuration ***********/ - props.put(NumPartitionsProp, numPartitions.toString) - props.put(LogDirProp, _logDir) - _logDirs.foreach(value => props.put(LogDirsProp, value)) - props.put(LogSegmentBytesProp, logSegmentBytes.toString) - - props.put(LogRollTimeHoursProp, logRollTimeHours.toString) - _logRollTimeMillis.foreach(v => props.put(LogRollTimeMillisProp, v.toString)) - - props.put(LogRollTimeJitterHoursProp, logRollTimeJitterHours.toString) - _logRollTimeJitterMillis.foreach(v => props.put(LogRollTimeJitterMillisProp, v.toString)) - - - props.put(LogRetentionTimeHoursProp, logRetentionTimeHours.toString) - _logRetentionTimeMins.foreach(v => props.put(LogRetentionTimeMinutesProp, v.toString)) - _logRetentionTimeMillis.foreach(v => props.put(LogRetentionTimeMillisProp, v.toString)) - - props.put(LogRetentionBytesProp, logRetentionBytes.toString) - props.put(LogCleanupIntervalMsProp, logCleanupIntervalMs.toString) - props.put(LogCleanupPolicyProp, logCleanupPolicy) - props.put(LogCleanerThreadsProp, logCleanerThreads.toString) - props.put(LogCleanerIoMaxBytesPerSecondProp, logCleanerIoMaxBytesPerSecond.toString) - props.put(LogCleanerDedupeBufferSizeProp, logCleanerDedupeBufferSize.toString) - props.put(LogCleanerIoBufferSizeProp, logCleanerIoBufferSize.toString) - props.put(LogCleanerDedupeBufferLoadFactorProp, logCleanerDedupeBufferLoadFactor.toString) - props.put(LogCleanerBackoffMsProp, logCleanerBackoffMs.toString) - props.put(LogCleanerMinCleanRatioProp, logCleanerMinCleanRatio.toString) - props.put(LogCleanerEnableProp, logCleanerEnable.toString) - props.put(LogCleanerDeleteRetentionMsProp, logCleanerDeleteRetentionMs.toString) - props.put(LogIndexSizeMaxBytesProp, logIndexSizeMaxBytes.toString) - props.put(LogIndexIntervalBytesProp, logIndexIntervalBytes.toString) - props.put(LogFlushIntervalMessagesProp, logFlushIntervalMessages.toString) - props.put(LogDeleteDelayMsProp, logDeleteDelayMs.toString) - props.put(LogFlushSchedulerIntervalMsProp, logFlushSchedulerIntervalMs.toString) - _logFlushIntervalMs.foreach(v => props.put(LogFlushIntervalMsProp, v.toString)) - props.put(LogFlushOffsetCheckpointIntervalMsProp, logFlushOffsetCheckpointIntervalMs.toString) - props.put(NumRecoveryThreadsPerDataDirProp, numRecoveryThreadsPerDataDir.toString) - props.put(AutoCreateTopicsEnableProp, autoCreateTopicsEnable.toString) - props.put(MinInSyncReplicasProp, minInSyncReplicas.toString) - - /** ********* Replication configuration ***********/ - props.put(ControllerSocketTimeoutMsProp, controllerSocketTimeoutMs.toString) - props.put(DefaultReplicationFactorProp, defaultReplicationFactor.toString) - props.put(ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString) - props.put(ReplicaSocketTimeoutMsProp, replicaSocketTimeoutMs.toString) - props.put(ReplicaSocketReceiveBufferBytesProp, replicaSocketReceiveBufferBytes.toString) - props.put(ReplicaFetchMaxBytesProp, replicaFetchMaxBytes.toString) - props.put(ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString) - props.put(ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString) - props.put(ReplicaFetchBackoffMsProp, replicaFetchBackoffMs.toString) - props.put(NumReplicaFetchersProp, numReplicaFetchers.toString) - props.put(ReplicaHighWatermarkCheckpointIntervalMsProp, replicaHighWatermarkCheckpointIntervalMs.toString) - props.put(FetchPurgatoryPurgeIntervalRequestsProp, fetchPurgatoryPurgeIntervalRequests.toString) - props.put(ProducerPurgatoryPurgeIntervalRequestsProp, producerPurgatoryPurgeIntervalRequests.toString) - props.put(AutoLeaderRebalanceEnableProp, autoLeaderRebalanceEnable.toString) - props.put(LeaderImbalancePerBrokerPercentageProp, leaderImbalancePerBrokerPercentage.toString) - props.put(LeaderImbalanceCheckIntervalSecondsProp, leaderImbalanceCheckIntervalSeconds.toString) - props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) - props.put(InterBrokerSecurityProtocolProp, interBrokerSecurityProtocol.toString) - props.put(InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString) - - - /** ********* Controlled shutdown configuration ***********/ - props.put(ControlledShutdownMaxRetriesProp, controlledShutdownMaxRetries.toString) - props.put(ControlledShutdownRetryBackoffMsProp, controlledShutdownRetryBackoffMs.toString) - props.put(ControlledShutdownEnableProp, controlledShutdownEnable.toString) - - /** ********* Consumer coordinator configuration ***********/ - props.put(ConsumerMinSessionTimeoutMsProp, consumerMinSessionTimeoutMs.toString) - props.put(ConsumerMaxSessionTimeoutMsProp, consumerMaxSessionTimeoutMs.toString) - - /** ********* Offset management configuration ***********/ - props.put(OffsetMetadataMaxSizeProp, offsetMetadataMaxSize.toString) - props.put(OffsetsLoadBufferSizeProp, offsetsLoadBufferSize.toString) - props.put(OffsetsTopicReplicationFactorProp, offsetsTopicReplicationFactor.toString) - props.put(OffsetsTopicPartitionsProp, offsetsTopicPartitions.toString) - props.put(OffsetsTopicSegmentBytesProp, offsetsTopicSegmentBytes.toString) - props.put(OffsetsTopicCompressionCodecProp, offsetsTopicCompressionCodec.codec.toString) - props.put(OffsetsRetentionMinutesProp, offsetsRetentionMinutes.toString) - props.put(OffsetsRetentionCheckIntervalMsProp, offsetsRetentionCheckIntervalMs.toString) - props.put(OffsetCommitTimeoutMsProp, offsetCommitTimeoutMs.toString) - props.put(OffsetCommitRequiredAcksProp, offsetCommitRequiredAcks.toString) - props.put(DeleteTopicEnableProp, deleteTopicEnable.toString) - props.put(CompressionTypeProp, compressionType.toString) - props.put(MetricNumSamplesProp, metricNumSamples.toString) - props.put(MetricSampleWindowMsProp, metricSampleWindowMs.toString) - props.put(MetricReporterClassesProp, JavaConversions.collectionAsScalaIterable(_metricReporterClasses).mkString(",")) - - props + props.asInstanceOf[Properties] } } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index b31b432..b7956bd 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -120,6 +120,6 @@ class ReplicaFetcherThread(name:String, // any logic for partitions whose leader has changed def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { - delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs) + delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong) } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index ace6321..3ea4ea8 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -31,29 +31,13 @@ import scala.util.Random._ class KafkaConfigConfigDefTest extends JUnit3Suite { @Test - def testFromPropsDefaults() { - val defaults = new Properties() - defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") - - // some ordinary setting - defaults.put(KafkaConfig.AdvertisedPortProp, "1818") - - val props = new Properties(defaults) - - val config = KafkaConfig.fromProps(props) - - Assert.assertEquals(1818, config.advertisedPort) - Assert.assertEquals("KafkaConfig defaults should be retained", Defaults.ConnectionsMaxIdleMs, config.connectionsMaxIdleMs) - } - - @Test def testFromPropsEmpty() { // only required val p = new Properties() p.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") val actualConfig = KafkaConfig.fromProps(p) - val expectedConfig = new KafkaConfig(zkConnect = "127.0.0.1:2181") + val expectedConfig = new KafkaConfig(p) Assert.assertEquals(expectedConfig.zkConnect, actualConfig.zkConnect) Assert.assertEquals(expectedConfig.zkSessionTimeoutMs, actualConfig.zkSessionTimeoutMs) -- 2.3.2 (Apple Git-55)