diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java index 4b1d117..85e5018 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -18,24 +18,26 @@ package kafka.etl.impl; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.Random; import kafka.etl.KafkaETLKey; import kafka.etl.KafkaETLRequest; import kafka.etl.Props; import kafka.javaapi.producer.Producer; import kafka.message.Message; -import kafka.producer.ProducerConfig; import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import kafka.producer.SyncProducerConfigShared$; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapred.JobConf; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Random; + /** * Use this class to produce test events to Kafka server. Each event contains a * random timestamp in text format. @@ -53,8 +55,6 @@ public class DataGenerator { protected int _count; protected String _offsetsDir; protected final int TCP_BUFFER_SIZE = 300 * 1000; - protected final int CONNECT_TIMEOUT = 20000; // ms - protected final int RECONNECT_INTERVAL = Integer.MAX_VALUE; // ms public DataGenerator(String id, Props props) throws Exception { _props = props; @@ -63,20 +63,18 @@ public class DataGenerator { _count = props.getInt("event.count"); _offsetsDir = _props.getProperty("input"); - + // initialize kafka producer to generate count events String serverUri = _props.getProperty("kafka.server.uri"); _uri = new URI (serverUri); - + System.out.println("server uri:" + _uri.toString()); Properties producerProps = new Properties(); - producerProps.put("metadata.broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort())); - producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE)); - producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT)); - producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL)); - + producerProps.put(ProducerConfig.MetadataBrokerListProp(), String.format("%s:%d", _uri.getHost(), _uri.getPort())); + producerProps.put(SyncProducerConfigShared$.MODULE$.SendBufferBytesProp(), String.valueOf(TCP_BUFFER_SIZE)); + _producer = new Producer(new ProducerConfig(producerProps)); - + } public void run() throws Exception { @@ -95,7 +93,7 @@ public class DataGenerator { // close the producer _producer.close(); - + // generate offset files generateOffsets(); } @@ -107,7 +105,7 @@ public class DataGenerator { Path outPath = new Path(_offsetsDir + Path.SEPARATOR + "1.dat"); FileSystem fs = outPath.getFileSystem(conf); if (fs.exists(outPath)) fs.delete(outPath); - + KafkaETLRequest request = new KafkaETLRequest(_topic, "tcp://" + _uri.getHost() + ":" + _uri.getPort(), 0); @@ -115,12 +113,12 @@ public class DataGenerator { byte[] bytes = request.toString().getBytes("UTF-8"); KafkaETLKey dummyKey = new KafkaETLKey(); SequenceFile.setCompressionType(conf, SequenceFile.CompressionType.NONE); - SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outPath, + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outPath, KafkaETLKey.class, BytesWritable.class); writer.append(dummyKey, new BytesWritable(bytes)); writer.close(); } - + public static void main(String[] args) throws Exception { if (args.length < 1) diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java index 32f096c..db9a0d0 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java @@ -16,25 +16,23 @@ */ package kafka.bridge.hadoop; -import java.io.IOException; -import java.net.URI; -import java.util.*; - import kafka.common.KafkaException; import kafka.javaapi.producer.Producer; import kafka.producer.ProducerConfig; - +import kafka.producer.SyncProducerConfigShared$; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.log4j.Logger; +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + public class KafkaOutputFormat extends OutputFormat { private Logger log = Logger.getLogger(KafkaOutputFormat.class); @@ -43,15 +41,15 @@ public class KafkaOutputFormat extends OutputFormat /** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window) */ public static final int KAFKA_QUEUE_SIZE = 10*1024*1024; - public static final String KAFKA_CONFIG_PREFIX = "kafka.output"; + public static final String KAFKA_CONFIG_PREFIX = "kafka.output."; private static final Map kafkaConfigMap; static { Map cMap = new HashMap(); // default Hadoop producer configs - cMap.put("producer.type", "sync"); - cMap.put("send.buffer.bytes", Integer.toString(64*1024)); - cMap.put("compression.codec", Integer.toString(1)); + cMap.put(ProducerConfig.ProducerTypeProp(), "sync"); + cMap.put(SyncProducerConfigShared$.MODULE$.SendBufferBytesProp(), Integer.toString(64*1024)); + cMap.put(ProducerConfig.CompressionCodecProp(), Integer.toString(1)); kafkaConfigMap = Collections.unmodifiableMap(cMap); } @@ -106,32 +104,32 @@ public class KafkaOutputFormat extends OutputFormat if (m.getKey().equals(KAFKA_URL)) continue; - String kafkaKeyName = m.getKey().substring(KAFKA_CONFIG_PREFIX.length()+1); + String kafkaKeyName = m.getKey().substring(KAFKA_CONFIG_PREFIX.length()); props.setProperty(kafkaKeyName, m.getValue()); // set Kafka producer property } // inject Kafka producer props back into jobconf for easier debugging for (Map.Entry m : props.entrySet()) { - job.set(KAFKA_CONFIG_PREFIX + "." + m.getKey().toString(), m.getValue().toString()); + job.set(KAFKA_CONFIG_PREFIX + m.getKey().toString(), m.getValue().toString()); } // KafkaOutputFormat specific parameters - final int queueSize = job.getInt(KAFKA_CONFIG_PREFIX + ".queue.size", KAFKA_QUEUE_SIZE); - job.setInt(KAFKA_CONFIG_PREFIX + ".queue.size", queueSize); + final int queueSize = job.getInt(KAFKA_CONFIG_PREFIX + "queue.size", KAFKA_QUEUE_SIZE); + job.setInt(KAFKA_CONFIG_PREFIX + "queue.size", queueSize); if (uri.getScheme().equals("kafka")) { // using the direct broker list // URL: kafka:/// // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar String brokerList = uri.getAuthority(); - props.setProperty("metadata.broker.list", brokerList); - job.set(KAFKA_CONFIG_PREFIX + ".metadata.broker.list", brokerList); + props.setProperty(ProducerConfig.MetadataBrokerListProp(), brokerList); + job.set(KAFKA_CONFIG_PREFIX + ProducerConfig.MetadataBrokerListProp(), brokerList); if (uri.getPath() == null || uri.getPath().length() <= 1) throw new KafkaException("no topic specified in kafka uri"); topic = uri.getPath().substring(1); // ignore the initial '/' in the path - job.set(KAFKA_CONFIG_PREFIX + ".topic", topic); + job.set(KAFKA_CONFIG_PREFIX + "topic", topic); log.info(String.format("using kafka broker %s (topic %s)", brokerList, topic)); } else throw new KafkaException("missing scheme from kafka uri (must be kafka://)"); diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index cc526ec..509d990 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -24,8 +24,9 @@ import kafka.common.KafkaException import kafka.utils.{Utils, Logging} import java.util.Properties import util.Random + import kafka.producer.async.AsyncProducerConfig -/** + /** * Helper functions common to clients (producer, consumer, or admin) */ object ClientUtils extends Logging{ @@ -33,7 +34,7 @@ object ClientUtils extends Logging{ /** * Used by the producer to send a metadata request since it has access to the ProducerConfig * @param topics The topics for which the metadata needs to be fetched - * @param brokers The brokers in the cluster as configured on the producer through metadata.broker.list + * @param brokers The brokers in the cluster as configured on the producer through ProducerConfig.MetadataBrokerListProp * @param producerConfig The producer's config * @return topic metadata response */ @@ -81,15 +82,15 @@ object ClientUtils extends Logging{ def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int, correlationId: Int = 0): TopicMetadataResponse = { val props = new Properties() - props.put("metadata.broker.list", brokers.map(_.getConnectionString()).mkString(",")) - props.put("client.id", clientId) - props.put("request.timeout.ms", timeoutMs.toString) + props.put(ProducerConfig.MetadataBrokerListProp, brokers.map(_.getConnectionString()).mkString(",")) + props.put(SyncProducerConfigShared.ClientIdProp, clientId) + props.put(SyncProducerConfigShared.RequestTimeoutMsProp, timeoutMs.toString) val producerConfig = new ProducerConfig(props) fetchTopicMetadata(topics, brokers, producerConfig, correlationId) } /** - * Parse a list of broker urls in the form host1:port1, host2:port2, ... + * Parse a list of broker urls in the form host1:port1, host2:port2, ... */ def parseBrokerList(brokerListStr: String): Seq[Broker] = { val brokersStr = Utils.parseCsvList(brokerListStr) @@ -103,5 +104,5 @@ object ClientUtils extends Logging{ new Broker(brokerId, hostName, port) }) } - + } \ No newline at end of file diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 719beb5..83bbff2 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -26,7 +26,7 @@ import java.io.PrintStream import kafka.message._ import kafka.serializer._ import kafka.utils._ -import kafka.metrics.KafkaMetricsReporter +import kafka.metrics.{KafkaCSVMetricsReporter, KafkaMetricsConfig, KafkaMetricsReporter} /** @@ -142,30 +142,30 @@ object ConsoleConsumer extends Logging { val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) if (csvMetricsReporterEnabled) { val csvReporterProps = new Properties() - csvReporterProps.put("kafka.metrics.polling.interval.secs", "5") - csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") + csvReporterProps.put(KafkaMetricsConfig.PollingIntervalSecs, "5") + csvReporterProps.put(KafkaMetricsConfig.Reporters, "kafka.metrics.KafkaCSVMetricsReporter") if (options.has(metricsDirectoryOpt)) - csvReporterProps.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt)) + csvReporterProps.put(KafkaCSVMetricsReporter.MetricsDir, options.valueOf(metricsDirectoryOpt)) else - csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics") - csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true") + csvReporterProps.put(KafkaCSVMetricsReporter.MetricsDir, "kafka_metrics") + csvReporterProps.put(KafkaCSVMetricsReporter.ReporterEnabled, "true") val verifiableProps = new VerifiableProperties(csvReporterProps) KafkaMetricsReporter.startReporters(verifiableProps) } val props = new Properties() - props.put("group.id", options.valueOf(groupIdOpt)) - props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) - props.put("socket.timeout.ms", options.valueOf(socketTimeoutMsOpt).toString) - props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) - props.put("fetch.min.bytes", options.valueOf(minFetchBytesOpt).toString) - props.put("fetch.wait.max.ms", options.valueOf(maxWaitMsOpt).toString) - props.put("auto.commit.enable", "true") - props.put("auto.commit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString) - props.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") - props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) - props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString) - props.put("refresh.leader.backoff.ms", options.valueOf(refreshMetadataBackoffMsOpt).toString) + props.put(ConsumerConfig.GroupIdProp, options.valueOf(groupIdOpt)) + props.put(ConsumerConfig.SocketReceiveBufferBytesProp, options.valueOf(socketBufferSizeOpt).toString) + props.put(ConsumerConfig.SocketTimeoutMsProp, options.valueOf(socketTimeoutMsOpt).toString) + props.put(ConsumerConfig.FetchMessageMaxBytesProp, options.valueOf(fetchSizeOpt).toString) + props.put(ConsumerConfig.FetchMinBytesProp, options.valueOf(minFetchBytesOpt).toString) + props.put(ConsumerConfig.FetchWaitMaxMsProp, options.valueOf(maxWaitMsOpt).toString) + props.put(ConsumerConfig.AutoCommitEnableProp, "true") + props.put(ConsumerConfig.AutoCommitIntervalMsProp, options.valueOf(autoCommitIntervalOpt).toString) + props.put(ConsumerConfig.AutoOffsetResetProp, if(options.has(resetBeginningOpt)) "smallest" else "largest") + props.put(ZKConfig.ZkConnectProp, options.valueOf(zkConnectOpt)) + props.put(ConsumerConfig.ConsumerTimeoutMsProp, options.valueOf(consumerTimeoutMsOpt).toString) + props.put(ConsumerConfig.RefreshLeaderBackoffMsProp, options.valueOf(refreshMetadataBackoffMsOpt).toString) val config = new ConsumerConfig(props) val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false @@ -184,7 +184,7 @@ object ConsoleConsumer extends Logging { override def run() { connector.shutdown() // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - if(!options.has(groupIdOpt)) + if(!options.has(groupIdOpt)) ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) } }) @@ -277,7 +277,7 @@ class DefaultMessageFormatter extends MessageFormatter { var printKey = false var keySeparator = "\t".getBytes var lineSeparator = "\n".getBytes - + override def init(props: Properties) { if(props.containsKey("print.key")) printKey = props.getProperty("print.key").trim.toLowerCase.equals("true") @@ -286,7 +286,7 @@ class DefaultMessageFormatter extends MessageFormatter { if(props.containsKey("line.separator")) lineSeparator = props.getProperty("line.separator").getBytes } - + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { if(printKey) { output.write(key) diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index c8c4212..8a76c63 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -41,9 +41,26 @@ object ConsumerConfig extends Config { val MirrorTopicsBlacklist = "" val MirrorConsumerNumThreads = 1 - val MirrorTopicsWhitelistProp = "mirror.topics.whitelist" - val MirrorTopicsBlacklistProp = "mirror.topics.blacklist" - val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" + final val MirrorTopicsWhitelistProp = "mirror.topics.whitelist" + final val MirrorTopicsBlacklistProp = "mirror.topics.blacklist" + final val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" + final val GroupIdProp = "group.id" + final val ClientIdProp = "client.id" + final val ConsumerIdProp = "consumer.id" + final val SocketTimeoutMsProp = "socket.timeout.ms" + final val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes" + final val FetchMessageMaxBytesProp = "fetch.message.max.bytes" + final val AutoCommitEnableProp = "auto.commit.enable" + final val AutoCommitIntervalMsProp = "auto.commit.interval.ms" + final val QueuedMaxMessageChunksProp = "queued.max.message.chunks" + final val RebalanceMaxRetriesProp = "rebalance.max.retries" + final val FetchMinBytesProp = "fetch.min.bytes" + final val FetchWaitMaxMsProp = "fetch.wait.max.ms" + final val RebalanceBackoffMsProp = "rebalance.backoff.ms" + final val RefreshLeaderBackoffMsProp = "refresh.leader.backoff.ms" + final val AutoOffsetResetProp = "auto.offset.reset" + final val ConsumerTimeoutMsProp = "consumer.timeout.ms" + val DefaultClientId = "" def validate(config: ConsumerConfig) { @@ -53,11 +70,11 @@ object ConsumerConfig extends Config { } def validateClientId(clientId: String) { - validateChars("client.id", clientId) + validateChars(ClientIdProp, clientId) } def validateGroupId(groupId: String) { - validateChars("group.id", groupId) + validateChars(GroupIdProp, groupId) } def validateAutoOffsetReset(autoOffsetReset: String) { @@ -79,58 +96,58 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( } /** a string that uniquely identifies a set of consumers within the same consumer group */ - val groupId = props.getString("group.id") + val groupId = props.getString(GroupIdProp) /** consumer id: generated automatically if not set. * Set this explicitly for only testing purpose. */ - val consumerId: Option[String] = Option(props.getString("consumer.id", null)) + val consumerId: Option[String] = Option(props.getString(ConsumerIdProp, null)) /** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */ - val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout) - + val socketTimeoutMs = props.getInt(SocketTimeoutMsProp, SocketTimeout) + /** the socket receive buffer for network requests */ - val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize) - + val socketReceiveBufferBytes = props.getInt(SocketReceiveBufferBytesProp, SocketBufferSize) + /** the number of byes of messages to attempt to fetch */ - val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize) - + val fetchMessageMaxBytes = props.getInt(FetchMessageMaxBytesProp, FetchSize) + /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */ - val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit) - + val autoCommitEnable = props.getBoolean(AutoCommitEnableProp, AutoCommit) + /** the frequency in ms that the consumer offsets are committed to zookeeper */ - val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", AutoCommitInterval) + val autoCommitIntervalMs = props.getInt(AutoCommitIntervalMsProp, AutoCommitInterval) /** max number of message chunks buffered for consumption, each chunk can be up to fetch.message.max.bytes*/ - val queuedMaxMessages = props.getInt("queued.max.message.chunks", MaxQueuedChunks) + val queuedMaxMessages = props.getInt(QueuedMaxMessageChunksProp, MaxQueuedChunks) /** max number of retries during rebalance */ - val rebalanceMaxRetries = props.getInt("rebalance.max.retries", MaxRebalanceRetries) - + val rebalanceMaxRetries = props.getInt(RebalanceMaxRetriesProp, MaxRebalanceRetries) + /** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */ - val fetchMinBytes = props.getInt("fetch.min.bytes", MinFetchBytes) - + val fetchMinBytes = props.getInt(FetchMinBytesProp, MinFetchBytes) + /** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes */ - val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs) - + val fetchWaitMaxMs = props.getInt(FetchWaitMaxMsProp, MaxFetchWaitMs) + /** backoff time between retries during rebalance */ - val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs) + val rebalanceBackoffMs = props.getInt(RebalanceBackoffMsProp, zkSyncTimeMs) /** backoff time to refresh the leader of a partition after it loses the current leader */ - val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", RefreshMetadataBackoffMs) + val refreshLeaderBackoffMs = props.getInt(RefreshLeaderBackoffMsProp, RefreshMetadataBackoffMs) /* what to do if an offset is out of range. smallest : automatically reset the offset to the smallest offset largest : automatically reset the offset to the largest offset anything else: throw exception to the consumer */ - val autoOffsetReset = props.getString("auto.offset.reset", AutoOffsetReset) + val autoOffsetReset = props.getString(AutoOffsetResetProp, AutoOffsetReset) /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */ - val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs) + val consumerTimeoutMs = props.getInt(ConsumerTimeoutMsProp, ConsumerTimeoutMs) /** * Client id is specified by the kafka consumer client, used to distinguish different clients */ - val clientId = props.getString("client.id", groupId) + val clientId = props.getString(ClientIdProp, groupId) validate(this) } diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala index ea9559f..ce9c2b0 100644 --- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala +++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala @@ -29,10 +29,15 @@ import kafka.utils.{Utils, VerifiableProperties, Logging} private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean +object KafkaCSVMetricsReporter { + val MetricsDir = "kafka.csv.metrics.dir" + val ReporterEnabled = "kafka.csv.metrics.reporter.enabled" +} private class KafkaCSVMetricsReporter extends KafkaMetricsReporter with KafkaCSVMetricsReporterMBean with Logging { + import KafkaCSVMetricsReporter._ private var csvDir: File = null private var underlying: CsvReporter = null @@ -47,11 +52,11 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter synchronized { if (!initialized) { val metricsConfig = new KafkaMetricsConfig(props) - csvDir = new File(props.getString("kafka.csv.metrics.dir", "kafka_metrics")) + csvDir = new File(props.getString(MetricsDir, "kafka_metrics")) Utils.rm(csvDir) csvDir.mkdirs() underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir) - if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = false)) { + if (props.getBoolean(ReporterEnabled, default = false)) { initialized = true startReporter(metricsConfig.pollingIntervalSecs) } diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala index 84f6208..d159544 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala @@ -22,16 +22,22 @@ package kafka.metrics import kafka.utils.{VerifiableProperties, Utils} +object KafkaMetricsConfig { + val Reporters = "kafka.metrics.reporters" + val PollingIntervalSecs = "kafka.metrics.polling.interval.secs" +} + class KafkaMetricsConfig(props: VerifiableProperties) { + import KafkaMetricsConfig._ /** * Comma-separated list of reporter types. These classes should be on the * classpath and will be instantiated at run-time. */ - val reporters = Utils.parseCsvList(props.getString("kafka.metrics.reporters", "")) + val reporters = Utils.parseCsvList(props.getString(Reporters, "")) /** * The metrics polling interval (in seconds). */ - val pollingIntervalSecs = props.getInt("kafka.metrics.polling.interval.secs", 10) + val pollingIntervalSecs = props.getInt(PollingIntervalSecs, 10) } diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index 5539bce..4c7e4aa 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -24,10 +24,11 @@ import java.io._ import kafka.common._ import kafka.message._ import kafka.serializer._ +import kafka.producer.async.AsyncProducerConfig -object ConsoleProducer { +object ConsoleProducer { - def main(args: Array[String]) { + def main(args: Array[String]) { val parser = new OptionParser val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.") .withRequiredArg @@ -44,13 +45,13 @@ object ConsoleProducer { .describedAs("size") .ofType(classOf[java.lang.Integer]) .defaultsTo(200) - val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" + + val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" + " a message will queue awaiting suffient batch size. The value is given in ms.") .withRequiredArg .describedAs("timeout_ms") .ofType(classOf[java.lang.Long]) .defaultsTo(1000) - val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " + + val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " + " messages will queue awaiting suffient batch size.") .withRequiredArg .describedAs("queue_size") @@ -81,7 +82,7 @@ object ConsoleProducer { .describedAs("encoder_class") .ofType(classOf[java.lang.String]) .defaultsTo(classOf[StringEncoder].getName) - val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " + + val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " + "By default each line is read as a separate message.") .withRequiredArg .describedAs("reader_class") @@ -126,20 +127,20 @@ object ConsoleProducer { cmdLineProps.put("topic", topic) val props = new Properties() - props.put("metadata.broker.list", brokerList) + props.put(ProducerConfig.MetadataBrokerListProp, brokerList) val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec - props.put("compression.codec", codec.toString) - props.put("producer.type", if(sync) "sync" else "async") + props.put(ProducerConfig.CompressionCodecProp, codec.toString) + props.put(ProducerConfig.ProducerTypeProp, if(sync) "sync" else "async") if(options.has(batchSizeOpt)) - props.put("batch.num.messages", batchSize.toString) - props.put("queue.buffering.max.ms", sendTimeout.toString) - props.put("queue.buffering.max.messages", queueSize.toString) - props.put("queue.enqueue.timeout.ms", queueEnqueueTimeoutMs.toString) - props.put("request.required.acks", requestRequiredAcks.toString) - props.put("request.timeout.ms", requestTimeoutMs.toString) - props.put("key.serializer.class", keyEncoderClass) - props.put("serializer.class", valueEncoderClass) - props.put("send.buffer.bytes", socketBuffer.toString) + props.put(AsyncProducerConfig.BatchNumMessagesProp, batchSize.toString) + props.put(AsyncProducerConfig.QueueBufferingMaxMsProp, sendTimeout.toString) + props.put(AsyncProducerConfig.QueueBufferingMaxMessagesProp, queueSize.toString) + props.put(AsyncProducerConfig.QueueEnqueueTimeoutMsProp, queueEnqueueTimeoutMs.toString) + props.put(SyncProducerConfigShared.RequestRequiredAcksProp, requestRequiredAcks.toString) + props.put(SyncProducerConfigShared.RequestTimeoutMsProp, requestTimeoutMs.toString) + props.put(AsyncProducerConfig.KeySerializerClassProp, keyEncoderClass) + props.put(AsyncProducerConfig.SerializerClassProp, valueEncoderClass) + props.put(SyncProducerConfigShared.SendBufferBytesProp, socketBuffer.toString) val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]] reader.init(System.in, cmdLineProps) @@ -178,7 +179,7 @@ object ConsoleProducer { props } - trait MessageReader[K,V] { + trait MessageReader[K,V] { def init(inputStream: InputStream, props: Properties) {} def readMessage(): KeyedMessage[K,V] def close() {} @@ -216,7 +217,7 @@ object ConsoleProducer { throw new KafkaException("No key found on line " + lineNumber + ": " + line) case n => new KeyedMessage(topic, - line.substring(0, n), + line.substring(0, n), if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)) } case (line, false) => diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index 88ae784..472a283 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -17,7 +17,7 @@ package kafka.producer -import async.MissingConfigException +import kafka.producer.async.{AsyncProducerConfig, MissingConfigException} import org.apache.log4j.spi.LoggingEvent import org.apache.log4j.AppenderSkeleton import org.apache.log4j.helpers.LogLog @@ -64,22 +64,22 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { // check for config parameter validity val props = new Properties() if(brokerList != null) - props.put("metadata.broker.list", brokerList) + props.put(ProducerConfig.MetadataBrokerListProp, brokerList) if(props.isEmpty) - throw new MissingConfigException("The metadata.broker.list property should be specified") + throw new MissingConfigException("The " + ProducerConfig.MetadataBrokerListProp + " property should be specified") if(topic == null) throw new MissingConfigException("topic must be specified by the Kafka log4j appender") if(serializerClass == null) { serializerClass = "kafka.serializer.StringEncoder" LogLog.debug("Using default encoder - kafka.serializer.StringEncoder") } - props.put("serializer.class", serializerClass) + props.put(AsyncProducerConfig.SerializerClassProp, serializerClass) //These have default values in ProducerConfig and AsyncProducerConfig. We don't care if they're not specified - if(producerType != null) props.put("producer.type", producerType) - if(compressionCodec != null) props.put("compression.codec", compressionCodec) - if(enqueueTimeout != null) props.put("queue.enqueue.timeout.ms", enqueueTimeout) - if(queueSize != null) props.put("queue.buffering.max.messages", queueSize) - if(requiredNumAcks != Int.MaxValue) props.put("request.required.acks", requiredNumAcks.toString) + if(producerType != null) props.put(ProducerConfig.ProducerTypeProp, producerType) + if(compressionCodec != null) props.put(ProducerConfig.CompressionCodecProp, compressionCodec) + if(enqueueTimeout != null) props.put(AsyncProducerConfig.QueueEnqueueTimeoutMsProp, enqueueTimeout) + if(queueSize != null) props.put(AsyncProducerConfig.QueueBufferingMaxMessagesProp, queueSize) + if(requiredNumAcks != Int.MaxValue) props.put(SyncProducerConfigShared.RequestRequiredAcksProp, requiredNumAcks.toString) val config : ProducerConfig = new ProducerConfig(props) producer = new Producer[String, String](config) LogLog.debug("Kafka producer connected to " + config.brokerList) diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala index 7947b18..50e2673 100644 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -24,6 +24,15 @@ import kafka.message.{CompressionCodec, NoCompressionCodec} import kafka.common.{InvalidConfigException, Config} object ProducerConfig extends Config { + final val MetadataBrokerListProp = "metadata.broker.list" + final val PartitionerClassProp = "partitioner.class" + final val ProducerTypeProp = "producer.type" + final val CompressionCodecProp = "compression.codec" + final val CompressedTopicsProp = "compressed.topics" + final val MessageSendMaxRetriesProp = "message.send.max.retries" + final val RetryBackoffMsProp = "retry.backoff.ms" + final val TopicMetadataRefreshIntervalMsProp = "topic.metadata.refresh.interval.ms" + def validate(config: ProducerConfig) { validateClientId(config.clientId) validateBatchSize(config.batchNumMessages, config.queueBufferingMaxMessages) @@ -31,7 +40,7 @@ object ProducerConfig extends Config { } def validateClientId(clientId: String) { - validateChars("client.id", clientId) + validateChars(SyncProducerConfigShared.ClientIdProp, clientId) } def validateBatchSize(batchSize: Int, queueSize: Int) { @@ -63,22 +72,22 @@ class ProducerConfig private (val props: VerifiableProperties) * format is host1:port1,host2:port2, and the list can be a subset of brokers or * a VIP pointing to a subset of brokers. */ - val brokerList = props.getString("metadata.broker.list") + val brokerList = props.getString(MetadataBrokerListProp) /** the partitioner class for partitioning events amongst sub-topics */ - val partitionerClass = props.getString("partitioner.class", "kafka.producer.DefaultPartitioner") + val partitionerClass = props.getString(PartitionerClassProp, "kafka.producer.DefaultPartitioner") /** this parameter specifies whether the messages are sent asynchronously * * or not. Valid values are - async for asynchronous send * * sync for synchronous send */ - val producerType = props.getString("producer.type", "sync") + val producerType = props.getString(ProducerTypeProp, "sync") /** * This parameter allows you to specify the compression codec for all data generated * * by this producer. The default is NoCompressionCodec */ val compressionCodec = { - val prop = props.getString("compression.codec", NoCompressionCodec.name) + val prop = props.getString(CompressionCodecProp, NoCompressionCodec.name) try { CompressionCodec.getCompressionCodec(prop.toInt) } @@ -99,18 +108,18 @@ class ProducerConfig private (val props: VerifiableProperties) * * If the compression codec is NoCompressionCodec, compression is disabled for all topics */ - val compressedTopics = Utils.parseCsvList(props.getString("compressed.topics", null)) + val compressedTopics = Utils.parseCsvList(props.getString(CompressedTopicsProp, null)) /** The leader may be unavailable transiently, which can fail the sending of a message. * This property specifies the number of retries when such failures occur. */ - val messageSendMaxRetries = props.getInt("message.send.max.retries", 3) + val messageSendMaxRetries = props.getInt(MessageSendMaxRetriesProp, 3) /** Before each retry, the producer refreshes the metadata of relevant topics. Since leader * election takes a bit of time, this property specifies the amount of time that the producer * waits before refreshing the metadata. */ - val retryBackoffMs = props.getInt("retry.backoff.ms", 100) + val retryBackoffMs = props.getInt(RetryBackoffMsProp, 100) /** * The producer generally refreshes the topic metadata from brokers when there is a failure @@ -120,7 +129,7 @@ class ProducerConfig private (val props: VerifiableProperties) * Important note: the refresh happen only AFTER the message is sent, so if the producer never sends * a message the metadata is never refreshed */ - val topicMetadataRefreshIntervalMs = props.getInt("topic.metadata.refresh.interval.ms", 600000) + val topicMetadataRefreshIntervalMs = props.getInt(TopicMetadataRefreshIntervalMsProp, 600000) validate(this) } diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala index 43df70b..c8e2794 100644 --- a/core/src/main/scala/kafka/producer/ProducerPool.scala +++ b/core/src/main/scala/kafka/producer/ProducerPool.scala @@ -32,8 +32,8 @@ object ProducerPool { */ def createSyncProducer(config: ProducerConfig, broker: Broker): SyncProducer = { val props = new Properties() - props.put("host", broker.host) - props.put("port", broker.port.toString) + props.put(SyncProducerConfig.HostProp, broker.host) + props.put(SyncProducerConfig.PortProp, broker.port.toString) props.putAll(config.props.props) new SyncProducer(new SyncProducerConfig(props)) } diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala index ef32620..aa48416 100644 --- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala @@ -21,41 +21,54 @@ import java.util.Properties import kafka.utils.VerifiableProperties class SyncProducerConfig private (val props: VerifiableProperties) extends SyncProducerConfigShared { + import SyncProducerConfig._ + def this(originalProps: Properties) { this(new VerifiableProperties(originalProps)) // no need to verify the property since SyncProducerConfig is supposed to be used internally } /** the broker to which the producer sends events */ - val host = props.getString("host") + val host = props.getString(HostProp) /** the port on which the broker is running */ - val port = props.getInt("port") + val port = props.getInt(PortProp) +} + +object SyncProducerConfigShared { + final val SendBufferBytesProp = "send.buffer.bytes" + final val ClientIdProp = "client.id" + final val RequestRequiredAcksProp = "request.required.acks" + final val RequestTimeoutMsProp = "request.timeout.ms" } trait SyncProducerConfigShared { + import SyncProducerConfigShared._ + val props: VerifiableProperties - - val sendBufferBytes = props.getInt("send.buffer.bytes", 100*1024) + + val sendBufferBytes = props.getInt(SendBufferBytesProp, 100*1024) /* the client application sending the producer requests */ - val clientId = props.getString("client.id", SyncProducerConfig.DefaultClientId) + val clientId = props.getString(ClientIdProp, SyncProducerConfig.DefaultClientId) /* * The required acks of the producer requests - negative value means ack * after the replicas in ISR have caught up to the leader's offset * corresponding to this produce request. */ - val requestRequiredAcks = props.getShort("request.required.acks", SyncProducerConfig.DefaultRequiredAcks) + val requestRequiredAcks = props.getShort(RequestRequiredAcksProp, SyncProducerConfig.DefaultRequiredAcks) /* * The ack timeout of the producer requests. Value must be non-negative and non-zero */ - val requestTimeoutMs = props.getIntInRange("request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs, + val requestTimeoutMs = props.getIntInRange(RequestTimeoutMsProp, SyncProducerConfig.DefaultAckTimeoutMs, (1, Integer.MAX_VALUE)) } object SyncProducerConfig { + final val HostProp = "host" + final val PortProp = "port" val DefaultClientId = "" val DefaultRequiredAcks : Short = 0 val DefaultAckTimeoutMs = 1500 diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala index dd39de5..048ce9c 100644 --- a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala @@ -18,14 +18,25 @@ package kafka.producer.async import kafka.utils.VerifiableProperties +object AsyncProducerConfig { + final val QueueBufferingMaxMsProp = "queue.buffering.max.ms" + final val QueueBufferingMaxMessagesProp = "queue.buffering.max.messages" + final val QueueEnqueueTimeoutMsProp = "queue.enqueue.timeout.ms" + final val BatchNumMessagesProp = "batch.num.messages" + final val SerializerClassProp = "serializer.class" + final val KeySerializerClassProp = "key.serializer.class" +} + trait AsyncProducerConfig { + import AsyncProducerConfig._ + val props: VerifiableProperties /* maximum time, in milliseconds, for buffering data on the producer queue */ - val queueBufferingMaxMs = props.getInt("queue.buffering.max.ms", 5000) + val queueBufferingMaxMs = props.getInt(QueueBufferingMaxMsProp, 5000) /** the maximum size of the blocking queue for buffering on the producer */ - val queueBufferingMaxMessages = props.getInt("queue.buffering.max.messages", 10000) + val queueBufferingMaxMessages = props.getInt(QueueBufferingMaxMessagesProp, 10000) /** * Timeout for event enqueue: @@ -33,15 +44,15 @@ trait AsyncProducerConfig { * -ve: enqueue will block indefinitely if the queue is full * +ve: enqueue will block up to this many milliseconds if the queue is full */ - val queueEnqueueTimeoutMs = props.getInt("queue.enqueue.timeout.ms", -1) + val queueEnqueueTimeoutMs = props.getInt(QueueEnqueueTimeoutMsProp, -1) /** the number of messages batched at the producer */ - val batchNumMessages = props.getInt("batch.num.messages", 200) + val batchNumMessages = props.getInt(BatchNumMessagesProp, 200) /** the serializer class for values */ - val serializerClass = props.getString("serializer.class", "kafka.serializer.DefaultEncoder") - + val serializerClass = props.getString(SerializerClassProp, "kafka.serializer.DefaultEncoder") + /** the serializer class for keys (defaults to the same as for values) */ - val keySerializerClass = props.getString("key.serializer.class", serializerClass) - + val keySerializerClass = props.getString(KeySerializerClassProp, serializerClass) + } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b774431..a64b210 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -25,7 +25,57 @@ import kafka.utils.{VerifiableProperties, ZKConfig, Utils} /** * Configuration settings for the kafka server */ +object KafkaConfig { + final val BrokerIdProp = "broker.id" + final val MessageMaxBytesProp = "message.max.bytes" + final val NumNetworkThreadsProp = "num.network.threads" + final val NumIoThreadsProp = "num.io.threads" + final val QueuedMaxRequestsProp = "queued.max.requests" + final val PortProp = "port" + final val HostNameProp = "host.name" + final val SocketSendBufferBytesProp = "socket.send.buffer.bytes" + final val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes" + final val SocketRequestMaxBytesProp = "socket.request.max.bytes" + final val NumPartitionsProp = "num.partitions" + final val LogDirProp = "log.dir" + final val LogDirsProp = "log.dirs" + final val LogSegmentBytesProp = "log.segment.bytes" + final val LogSegmentBytesPerTopicMapProp = "log.segment.bytes.per.topic" + final val LogRollHoursProp = "log.roll.hours" + final val LogRollHoursPerTopicMapProp = "log.roll.hours.per.topic" + final val LogRetentionHoursProp = "log.retention.hours" + final val LogRetentionHoursPerTopicMapProp = "log.retention.hours.per.topic" + final val LogRetentionBytesProp = "log.retention.bytes" + final val LogRetentionBytesPerTopicMapProp = "log.retention.bytes.per.topic" + final val LogCleanupIntervalMinsProp = "log.cleanup.interval.mins" + final val LogIndexSizeMaxBytesProp = "log.index.size.max.bytes" + final val LogIndexIntervalBytesProp = "log.index.interval.bytes" + final val LogFlushIntervalMessagesProp = "log.flush.interval.messages" + final val LogFlushIntervalMsPerTopicMapProp = "log.flush.interval.ms.per.topic" + final val LogFlushSchedulerIntervalMsProp = "log.flush.scheduler.interval.ms" + final val LogFlushIntervalMsProp = "log.flush.interval.ms" + final val AutoCreateTopicsEnableProp = "auto.create.topics.enable" + final val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms" + final val ControllerMessageQueueSizeProp= "controller.message.queue.size" + final val DefaultReplicationFactorProp = "default.replication.factor" + final val ReplicaLagTimeMaxMsProp = "replica.lag.time.max.ms" + final val ReplicaLagMaxMessagesProp = "replica.lag.max.messages" + final val ReplicaSocketTimeoutMsProp = "replica.socket.timeout.ms" + final val ReplicaSocketReceiveBufferBytesProp = "replica.socket.receive.buffer.bytes" + final val ReplicaFetchMaxBytesProp = "replica.fetch.max.bytes" + final val ReplicaFetchWaitMaxMsProp = "replica.fetch.wait.max.ms" + final val ReplicaFetchMinBytesProp = "replica.fetch.min.bytes" + final val NumReplicaFetchersProp = "num.replica.fetchers" + final val ReplicaHighWatermarkCheckpointIntervalMsProp = "replica.high.watermark.checkpoint.interval.ms" + final val FetchPurgatoryPurgeIntervalRequestsProp = "fetch.purgatory.purge.interval.requests" + final val ProducerPurgatoryPurgeIntervalRequestsProp = "producer.purgatory.purge.interval.requests" + final val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries" + final val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms" + final val ControlledShutdownEnableProp = "controlled.shutdown.enable" +} + class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(props) { + import KafkaConfig._ def this(originalProps: Properties) { this(new VerifiableProperties(originalProps)) @@ -33,152 +83,152 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro } /*********** General Configuration ***********/ - + /* the broker id for this server */ - val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue)) + val brokerId: Int = props.getIntInRange(BrokerIdProp, (0, Int.MaxValue)) /* the maximum size of message that the server can receive */ - val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000, (0, Int.MaxValue)) - + val messageMaxBytes = props.getIntInRange(MessageMaxBytesProp, 1000000, (0, Int.MaxValue)) + /* the number of network threads that the server uses for handling network requests */ - val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue)) + val numNetworkThreads = props.getIntInRange(NumNetworkThreadsProp, 3, (1, Int.MaxValue)) /* the number of io threads that the server uses for carrying out network requests */ - val numIoThreads = props.getIntInRange("num.io.threads", 8, (1, Int.MaxValue)) - + val numIoThreads = props.getIntInRange(NumIoThreadsProp, 8, (1, Int.MaxValue)) + /* the number of queued requests allowed before blocking the network threads */ - val queuedMaxRequests = props.getIntInRange("queued.max.requests", 500, (1, Int.MaxValue)) - + val queuedMaxRequests = props.getIntInRange(QueuedMaxRequestsProp, 500, (1, Int.MaxValue)) + /*********** Socket Server Configuration ***********/ - + /* the port to listen and accept connections on */ - val port: Int = props.getInt("port", 6667) + val port: Int = props.getInt(PortProp, 6667) /* hostname of broker. If this is set, it will only bind to this address. If this is not set, * it will bind to all interfaces, and publish one to ZK */ - val hostName: String = props.getString("host.name", null) + val hostName: String = props.getString(HostNameProp, null) /* the SO_SNDBUFF buffer of the socket sever sockets */ - val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024) - + val socketSendBufferBytes: Int = props.getInt(SocketSendBufferBytesProp, 100*1024) + /* the SO_RCVBUFF buffer of the socket sever sockets */ - val socketReceiveBufferBytes: Int = props.getInt("socket.receive.buffer.bytes", 100*1024) - + val socketReceiveBufferBytes: Int = props.getInt(SocketReceiveBufferBytesProp, 100*1024) + /* the maximum number of bytes in a socket request */ - val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024, (1, Int.MaxValue)) - + val socketRequestMaxBytes: Int = props.getIntInRange(SocketRequestMaxBytesProp, 100*1024*1024, (1, Int.MaxValue)) + /*********** Log Configuration ***********/ /* the default number of log partitions per topic */ - val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue)) - + val numPartitions = props.getIntInRange(NumPartitionsProp, 1, (1, Int.MaxValue)) + /* the directories in which the log data is kept */ - val logDirs = Utils.parseCsvList(props.getString("log.dirs", props.getString("log.dir", "/tmp/kafka-logs"))) + val logDirs = Utils.parseCsvList(props.getString(LogDirsProp, props.getString(LogDirProp, "/tmp/kafka-logs"))) require(logDirs.size > 0) - + /* the maximum size of a single log file */ - val logSegmentBytes = props.getIntInRange("log.segment.bytes", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue)) + val logSegmentBytes = props.getIntInRange(LogSegmentBytesProp, 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue)) /* the maximum size of a single log file for some specific topic */ - val logSegmentBytesPerTopicMap = props.getMap("log.segment.bytes.per.topic", _.toInt > 0).mapValues(_.toInt) + val logSegmentBytesPerTopicMap = props.getMap(LogSegmentBytesPerTopicMapProp, _.toInt > 0).mapValues(_.toInt) /* the maximum time before a new log segment is rolled out */ - val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue)) + val logRollHours = props.getIntInRange(LogRollHoursProp, 24*7, (1, Int.MaxValue)) /* the number of hours before rolling out a new log segment for some specific topic */ - val logRollHoursPerTopicMap = props.getMap("log.roll.hours.per.topic", _.toInt > 0).mapValues(_.toInt) + val logRollHoursPerTopicMap = props.getMap(LogRollHoursPerTopicMapProp, _.toInt > 0).mapValues(_.toInt) /* the number of hours to keep a log file before deleting it */ - val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue)) + val logRetentionHours = props.getIntInRange(LogRetentionHoursProp, 24*7, (1, Int.MaxValue)) /* the number of hours to keep a log file before deleting it for some specific topic*/ - val logRetentionHoursPerTopicMap = props.getMap("log.retention.hours.per.topic", _.toInt > 0).mapValues(_.toInt) + val logRetentionHoursPerTopicMap = props.getMap(LogRetentionHoursPerTopicMapProp, _.toInt > 0).mapValues(_.toInt) /* the maximum size of the log before deleting it */ - val logRetentionBytes = props.getLong("log.retention.bytes", -1) + val logRetentionBytes = props.getLong(LogRetentionBytesProp, -1) /* the maximum size of the log for some specific topic before deleting it */ - val logRetentionBytesPerTopicMap = props.getMap("log.retention.bytes.per.topic", _.toLong > 0).mapValues(_.toLong) + val logRetentionBytesPerTopicMap = props.getMap(LogRetentionBytesPerTopicMapProp, _.toLong > 0).mapValues(_.toLong) /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */ - val logCleanupIntervalMins = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue)) - + val logCleanupIntervalMins = props.getIntInRange(LogCleanupIntervalMinsProp, 10, (1, Int.MaxValue)) + /* the maximum size in bytes of the offset index */ - val logIndexSizeMaxBytes = props.getIntInRange("log.index.size.max.bytes", 10*1024*1024, (4, Int.MaxValue)) - + val logIndexSizeMaxBytes = props.getIntInRange(LogIndexSizeMaxBytesProp, 10*1024*1024, (4, Int.MaxValue)) + /* the interval with which we add an entry to the offset index */ - val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue)) + val logIndexIntervalBytes = props.getIntInRange(LogIndexIntervalBytesProp, 4096, (0, Int.MaxValue)) /* the number of messages accumulated on a log partition before messages are flushed to disk */ - val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 10000, (1, Int.MaxValue)) + val logFlushIntervalMessages = props.getIntInRange(LogFlushIntervalMessagesProp, 10000, (1, Int.MaxValue)) /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */ - val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.interval.ms.per.topic", _.toInt > 0).mapValues(_.toInt) + val logFlushIntervalMsPerTopicMap = props.getMap(LogFlushIntervalMsPerTopicMapProp, _.toInt > 0).mapValues(_.toInt) /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */ - val logFlushSchedulerIntervalMs = props.getInt("log.flush.scheduler.interval.ms", 3000) + val logFlushSchedulerIntervalMs = props.getInt(LogFlushSchedulerIntervalMsProp, 3000) /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */ - val logFlushIntervalMs = props.getInt("log.flush.interval.ms", logFlushSchedulerIntervalMs) + val logFlushIntervalMs = props.getInt(LogFlushIntervalMsProp, logFlushSchedulerIntervalMs) /* enable auto creation of topic on the server */ - val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true) + val autoCreateTopicsEnable = props.getBoolean(AutoCreateTopicsEnableProp, true) /*********** Replication configuration ***********/ /* the socket timeout for controller-to-broker channels */ - val controllerSocketTimeoutMs = props.getInt("controller.socket.timeout.ms", 30000) + val controllerSocketTimeoutMs = props.getInt(ControllerSocketTimeoutMsProp, 30000) /* the buffer size for controller-to-broker-channels */ - val controllerMessageQueueSize= props.getInt("controller.message.queue.size", 10) + val controllerMessageQueueSize= props.getInt(ControllerMessageQueueSizeProp, 10) /* default replication factors for automatically created topics */ - val defaultReplicationFactor = props.getInt("default.replication.factor", 1) + val defaultReplicationFactor = props.getInt(DefaultReplicationFactorProp, 1) /* If a follower hasn't sent any fetch requests during this time, the leader will remove the follower from isr */ - val replicaLagTimeMaxMs = props.getLong("replica.lag.time.max.ms", 10000) + val replicaLagTimeMaxMs = props.getLong(ReplicaLagTimeMaxMsProp, 10000) /* If the lag in messages between a leader and a follower exceeds this number, the leader will remove the follower from isr */ - val replicaLagMaxMessages = props.getLong("replica.lag.max.messages", 4000) + val replicaLagMaxMessages = props.getLong(ReplicaLagMaxMessagesProp, 4000) /* the socket timeout for network requests */ - val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout) + val replicaSocketTimeoutMs = props.getInt(ReplicaSocketTimeoutMsProp, ConsumerConfig.SocketTimeout) /* the socket receive buffer for network requests */ - val replicaSocketReceiveBufferBytes = props.getInt("replica.socket.receive.buffer.bytes", ConsumerConfig.SocketBufferSize) + val replicaSocketReceiveBufferBytes = props.getInt(ReplicaSocketReceiveBufferBytesProp, ConsumerConfig.SocketBufferSize) /* the number of byes of messages to attempt to fetch */ - val replicaFetchMaxBytes = props.getInt("replica.fetch.max.bytes", ConsumerConfig.FetchSize) + val replicaFetchMaxBytes = props.getInt(ReplicaFetchMaxBytesProp, ConsumerConfig.FetchSize) /* max wait time for each fetcher request issued by follower replicas*/ - val replicaFetchWaitMaxMs = props.getInt("replica.fetch.wait.max.ms", 500) + val replicaFetchWaitMaxMs = props.getInt(ReplicaFetchWaitMaxMsProp, 500) /* minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */ - val replicaFetchMinBytes = props.getInt("replica.fetch.min.bytes", 1) + val replicaFetchMinBytes = props.getInt(ReplicaFetchMinBytesProp, 1) /* number of fetcher threads used to replicate messages from a source broker. * Increasing this value can increase the degree of I/O parallelism in the follower broker. */ - val numReplicaFetchers = props.getInt("num.replica.fetchers", 1) - + val numReplicaFetchers = props.getInt(NumReplicaFetchersProp, 1) + /* the frequency with which the high watermark is saved out to disk */ - val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L) + val replicaHighWatermarkCheckpointIntervalMs = props.getLong(ReplicaHighWatermarkCheckpointIntervalMsProp, 5000L) /* the purge interval (in number of requests) of the fetch request purgatory */ - val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 10000) + val fetchPurgatoryPurgeIntervalRequests = props.getInt(FetchPurgatoryPurgeIntervalRequestsProp, 10000) /* the purge interval (in number of requests) of the producer request purgatory */ - val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000) + val producerPurgatoryPurgeIntervalRequests = props.getInt(ProducerPurgatoryPurgeIntervalRequestsProp, 10000) /*********** Controlled shutdown configuration ***********/ /** Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens */ - val controlledShutdownMaxRetries = props.getInt("controlled.shutdown.max.retries", 3) + val controlledShutdownMaxRetries = props.getInt(ControlledShutdownMaxRetriesProp, 3) /** Before each retry, the system needs time to recover from the state that caused the previous failure (Controller * fail over, replica lag etc). This config determines the amount of time to wait before retrying. */ - val controlledShutdownRetryBackoffMs = props.getInt("controlled.shutdown.retry.backoff.ms", 5000) + val controlledShutdownRetryBackoffMs = props.getInt(ControlledShutdownRetryBackoffMsProp, 5000) /* enable controlled shutdown of the server */ - val controlledShutdownEnable = props.getBoolean("controlled.shutdown.enable", false) + val controlledShutdownEnable = props.getBoolean(ControlledShutdownEnableProp, false) } diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index 3c18286..4226dab 100644 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -21,6 +21,8 @@ import joptsimple.*; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; +import kafka.producer.SyncProducerConfigShared$; +import kafka.producer.async.AsyncProducerConfig$; import kafka.utils.Utils; import java.io.File; @@ -217,7 +219,7 @@ public class KafkaMigrationTool { Properties kafkaProducerProperties_08 = new Properties(); kafkaProducerProperties_08.load(new FileInputStream(producerConfigFile_08)); - kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); + kafkaProducerProperties_08.setProperty(AsyncProducerConfig$.MODULE$.SerializerClassProp(), "kafka.serializer.DefaultEncoder"); // create a producer channel instead int queueSize = options.valueOf(queueSizeOpt); ProducerDataChannel> producerDataChannel = new ProducerDataChannel>(queueSize); @@ -252,10 +254,10 @@ public class KafkaMigrationTool { migrationThreads.add(thread); } - String clientId = kafkaProducerProperties_08.getProperty("client.id"); + String clientId = kafkaProducerProperties_08.getProperty(SyncProducerConfigShared$.MODULE$.ClientIdProp()); // start producer threads for (int i = 0; i < numProducers; i++) { - kafkaProducerProperties_08.put("client.id", clientId + "-" + i); + kafkaProducerProperties_08.put(SyncProducerConfigShared$.MODULE$.ClientIdProp(), clientId + "-" + i); ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08); Producer producer = new Producer(producerConfig_08); ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i); diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 814d61a..7516624 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -20,11 +20,12 @@ package kafka.tools import joptsimple.OptionParser import java.util.concurrent.{Executors, CountDownLatch} import java.util.Properties -import kafka.producer.{KeyedMessage, ProducerConfig, Producer} +import kafka.producer.{SyncProducerConfigShared, KeyedMessage, ProducerConfig, Producer} import kafka.consumer._ -import kafka.utils.{Logging, ZkUtils} +import kafka.utils.{ZKConfig, Logging, ZkUtils} import kafka.api.OffsetRequest import kafka.message.CompressionCodec +import kafka.producer.async.AsyncProducerConfig object ReplayLogProducer extends Logging { @@ -42,12 +43,12 @@ object ReplayLogProducer extends Logging { // consumer properties val consumerProps = new Properties - consumerProps.put("group.id", GroupId) - consumerProps.put("zookeeper.connect", config.zkConnect) - consumerProps.put("consumer.timeout.ms", "10000") - consumerProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString) - consumerProps.put("fetch.message.max.bytes", (1024*1024).toString) - consumerProps.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString) + consumerProps.put(ConsumerConfig.GroupIdProp, GroupId) + consumerProps.put(ZKConfig.ZkConnectProp, config.zkConnect) + consumerProps.put(ConsumerConfig.ConsumerTimeoutMsProp, "10000") + consumerProps.put(ConsumerConfig.AutoOffsetResetProp, OffsetRequest.SmallestTimeString) + consumerProps.put(ConsumerConfig.FetchMessageMaxBytesProp, (1024*1024).toString) + consumerProps.put(ConsumerConfig.SocketReceiveBufferBytesProp, (2 * 1024 * 1024).toString) val consumerConfig = new ConsumerConfig(consumerProps) val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads)) @@ -139,15 +140,14 @@ object ReplayLogProducer extends Logging { class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging { val shutdownLatch = new CountDownLatch(1) val props = new Properties() - props.put("metadata.broker.list", config.brokerList) - props.put("reconnect.interval", Integer.MAX_VALUE.toString) - props.put("send.buffer.bytes", (64*1024).toString) - props.put("compression.codec", config.compressionCodec.codec.toString) - props.put("batch.num.messages", config.batchSize.toString) - props.put("queue.enqueue.timeout.ms", "-1") - + props.put(ProducerConfig.MetadataBrokerListProp, config.brokerList) + props.put(SyncProducerConfigShared.SendBufferBytesProp, (64*1024).toString) + props.put(ProducerConfig.CompressionCodecProp, config.compressionCodec.codec.toString) + props.put(AsyncProducerConfig.BatchNumMessagesProp, config.batchSize.toString) + props.put(AsyncProducerConfig.QueueEnqueueTimeoutMsProp, "-1") + if(config.isAsync) - props.put("producer.type", "async") + props.put(ProducerConfig.ProducerTypeProp, "async") val producerConfig = new ProducerConfig(props) val producer = new Producer[Array[Byte], Array[Byte]](producerConfig) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index d53d511..b24cd82 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -368,7 +368,7 @@ object ZkUtils extends Logging { case e2 => throw e2 } } - + def deletePath(client: ZkClient, path: String): Boolean = { try { client.delete(path) @@ -391,7 +391,7 @@ object ZkUtils extends Logging { case e2 => throw e2 } } - + def maybeDeletePath(zkUrl: String, dir: String) { try { val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) @@ -621,7 +621,7 @@ object ZkUtils extends Logging { partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1, p._2, brokerId)) }.flatten } - + def getPartitionsUndergoingPreferredReplicaElection(zkClient: ZkClient): Set[TopicAndPartition] = { // read the partitions and their new replica list val jsonPartitionListOpt = readDataMaybeNull(zkClient, PreferredReplicaLeaderElectionPath)._1 @@ -762,17 +762,25 @@ class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) def consumerOwnerDir = consumerGroupDir + "/owners/" + topic } +object ZKConfig { + final val ZkConnectProp = "zookeeper.connect" + final val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms" + final val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms" + final val ZkSyncTimeMsProp = "zookeeper.sync.time.ms" +} class ZKConfig(props: VerifiableProperties) { + import ZKConfig._ + /** ZK host string */ - val zkConnect = props.getString("zookeeper.connect") + val zkConnect = props.getString(ZkConnectProp) /** zookeeper session timeout */ - val zkSessionTimeoutMs = props.getInt("zookeeper.session.timeout.ms", 6000) + val zkSessionTimeoutMs = props.getInt(ZkSessionTimeoutMsProp, 6000) /** the max time that the client waits to establish a connection to zookeeper */ - val zkConnectionTimeoutMs = props.getInt("zookeeper.connection.timeout.ms",zkSessionTimeoutMs) + val zkConnectionTimeoutMs = props.getInt(ZkConnectionTimeoutMsProp,zkSessionTimeoutMs) /** how far a ZK follower can be behind a ZK leader */ - val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000) + val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000) } diff --git a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala index c4aed10..a8b802e 100644 --- a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala +++ b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -21,6 +21,8 @@ import java.util.Properties import kafka.consumer._ import kafka.producer._ import kafka.message._ +import kafka.api.OffsetRequest +import kafka.utils.ZKConfig object TestEndToEndLatency { def main(args: Array[String]) { @@ -33,24 +35,24 @@ object TestEndToEndLatency { val zkConnect = args(1) val numMessages = args(2).toInt val topic = "test" - + val consumerProps = new Properties() - consumerProps.put("group.id", topic) - consumerProps.put("auto.commit", "true") - consumerProps.put("auto.offset.reset", "largest") - consumerProps.put("zookeeper.connect", zkConnect) - consumerProps.put("socket.timeout.ms", 1201000.toString) - + consumerProps.put(ConsumerConfig.GroupIdProp, topic) + consumerProps.put(ConsumerConfig.AutoCommitEnableProp, "true") + consumerProps.put(ConsumerConfig.AutoOffsetResetProp, OffsetRequest.LargestTimeString) + consumerProps.put(ZKConfig.ZkConnectProp, zkConnect) + consumerProps.put(ConsumerConfig.SocketTimeoutMsProp, 1201000.toString) + val config = new ConsumerConfig(consumerProps) val connector = Consumer.create(config) var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head val iter = stream.iterator val producerProps = new Properties() - producerProps.put("metadata.broker.list", brokerList) - producerProps.put("producer.type", "sync") + producerProps.put(ProducerConfig.MetadataBrokerListProp, brokerList) + producerProps.put(ProducerConfig.ProducerTypeProp, "sync") val producer = new Producer[Any, Any](new ProducerConfig(producerProps)) - + val message = new Message("hello there beautiful".getBytes) var totalTime = 0.0 for(i <- 0 until numMessages) { diff --git a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala index 31534ca..7623806 100644 --- a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala +++ b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -20,6 +20,7 @@ package kafka import consumer._ import utils.Utils import java.util.concurrent.CountDownLatch +import kafka.api.OffsetRequest object TestZKConsumerOffsets { def main(args: Array[String]): Unit = { @@ -29,10 +30,10 @@ object TestZKConsumerOffsets { } println("Starting consumer...") val topic = args(1) - val autoOffsetReset = args(2) + val autoOffsetReset = args(2) val props = Utils.loadProps(args(0)) - props.put("auto.offset.reset", "largest") - + props.put(ConsumerConfig.AutoOffsetResetProp, OffsetRequest.LargestTimeString) + val config = new ConsumerConfig(props) val consumerConnector: ConsumerConnector = Consumer.create(config) val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> 1)) @@ -49,7 +50,7 @@ object TestZKConsumerOffsets { override def run() = { consumerConnector.shutdown threadList.foreach(_.shutdown) - println("consumer threads shutted down") + println("consumer threads shut down") } }) } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index fcfc583..023320e 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -6,7 +6,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -32,6 +32,7 @@ import kafka.producer.{ProducerConfig, KeyedMessage, Producer} import java.util.{Collections, Properties} import org.apache.log4j.{Logger, Level} import kafka.utils.TestUtils._ +import kafka.producer.async.AsyncProducerConfig class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { @@ -346,11 +347,11 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar compression: CompressionCodec = NoCompressionCodec): List[String] = { val header = "test-%d-%d".format(config.brokerId, partition) val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - props.put("partitioner.class", "kafka.utils.FixedValuePartitioner") - props.put("compression.codec", compression.codec.toString) - props.put("key.serializer.class", classOf[IntEncoder].getName.toString) - props.put("serializer.class", classOf[StringEncoder].getName.toString) + props.put(ProducerConfig.MetadataBrokerListProp, TestUtils.getBrokerListStrFromConfigs(configs)) + props.put(ProducerConfig.PartitionerClassProp, "kafka.utils.FixedValuePartitioner") + props.put(ProducerConfig.CompressedTopicsProp, compression.codec.toString) + props.put(AsyncProducerConfig.KeySerializerClassProp, classOf[IntEncoder].getName.toString) + props.put(AsyncProducerConfig.SerializerClassProp, classOf[StringEncoder].getName.toString) val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props)) val ms = 0.until(numMessages).map(x => header + config.brokerId + "-" + partition + "-" + x) producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) @@ -359,17 +360,17 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar ms.toList } - def sendMessages(config: KafkaConfig, - messagesPerNode: Int, - header: String, - compression: CompressionCodec, + def sendMessages(config: KafkaConfig, + messagesPerNode: Int, + header: String, + compression: CompressionCodec, numParts: Int): List[String]= { var messages: List[String] = Nil val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - props.put("partitioner.class", "kafka.utils.FixedValuePartitioner") - props.put("key.serializer.class", classOf[IntEncoder].getName.toString) - props.put("serializer.class", classOf[StringEncoder].getName) + props.put(ProducerConfig.MetadataBrokerListProp, TestUtils.getBrokerListStrFromConfigs(configs)) + props.put(ProducerConfig.PartitionerClassProp, "kafka.utils.FixedValuePartitioner") + props.put(AsyncProducerConfig.KeySerializerClassProp, classOf[IntEncoder].getName.toString) + props.put(AsyncProducerConfig.SerializerClassProp, classOf[StringEncoder].getName) val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props)) for (partition <- 0 until numParts) { val ms = 0.until(messagesPerNode).map(x => header + config.brokerId + "-" + partition + "-" + x) @@ -388,7 +389,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar messages } - def getMessages(nMessagesPerThread: Int, + def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaStream[String, String]]]): List[String]= { var messages: List[String] = Nil for((topic, messageStreams) <- topicMessageStreams) { diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index f764151..d0b34cd 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import junit.framework.Assert._ import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder} import kafka.server.{KafkaRequestHandler, KafkaConfig} -import kafka.producer.{KeyedMessage, Producer, ProducerConfig} +import kafka.producer.{SyncProducerConfigShared, KeyedMessage, Producer, ProducerConfig} import org.apache.log4j.{Level, Logger} import org.I0Itec.zkclient.ZkClient import kafka.zk.ZooKeeperTestHarness @@ -30,6 +30,7 @@ import scala.collection._ import kafka.admin.CreateTopicCommand import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} import kafka.utils.{TestUtils, Utils} +import kafka.message.DefaultCompressionCodec /** * End to end tests of the primitive apis against a local server @@ -107,7 +108,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testDefaultEncoderProducerAndFetchWithCompression() { val topic = "test-topic" val props = producer.config.props.props - props.put("compression", "true") + props.put(ProducerConfig.CompressionCodecProp, DefaultCompressionCodec.name) val config = new ProducerConfig(props) val stringProducer1 = new Producer[String, String](config) @@ -309,7 +310,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testPipelinedProduceRequests() { createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId) val props = producer.config.props.props - props.put("request.required.acks", "0") + props.put(SyncProducerConfigShared.RequestRequiredAcksProp, "0") val pipelinedProducer: Producer[String, String] = new Producer(new ProducerConfig(props)) // send some messages diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index ce893bf..6732aec 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -58,7 +58,7 @@ class LogManagerTest extends JUnit3Suite { logManager.logDirs.map(Utils.rm(_)) super.tearDown() } - + @Test def testCreateLog() { val log = logManager.getOrCreateLog(name, 0) @@ -175,18 +175,18 @@ class LogManagerTest extends JUnit3Suite { assertTrue("The last flush time has to be within defaultflushInterval of current time (was %d)".format(ellapsed), ellapsed < 2*config.logFlushSchedulerIntervalMs) } - + @Test def testLeastLoadedAssignment() { // create a log manager with multiple data directories val props = TestUtils.createBrokerConfig(0, -1) - val dirs = Seq(TestUtils.tempDir().getAbsolutePath, - TestUtils.tempDir().getAbsolutePath, + val dirs = Seq(TestUtils.tempDir().getAbsolutePath, + TestUtils.tempDir().getAbsolutePath, TestUtils.tempDir().getAbsolutePath) - props.put("log.dirs", dirs.mkString(",")) + props.put(KafkaConfig.LogDirsProp, dirs.mkString(",")) logManager.shutdown() logManager = new LogManager(new KafkaConfig(props), scheduler, time) - + // verify that logs are always assigned to the least loaded partition for(partition <- 0 until 20) { logManager.getOrCreateLog("test", partition) @@ -195,13 +195,13 @@ class LogManagerTest extends JUnit3Suite { assertTrue("Load should balance evenly", counts.max <= counts.min + 1) } } - + def testTwoLogManagersUsingSameDirFails() { try { new LogManager(logManager.config, scheduler, time) fail("Should not be able to create a second log manager instance with the same data directory") } catch { - case e: KafkaException => // this is good + case e: KafkaException => // this is good } } } diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala index 1a9cc01..9f026f6 100644 --- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -31,9 +31,10 @@ import kafka.admin.CreateTopicCommand import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} import kafka.utils.TestUtils._ import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.producer.ProducerConfig object LogOffsetTest { - val random = new Random() + val random = new Random() } class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -198,16 +199,15 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { private def createBrokerConfig(nodeId: Int, port: Int): Properties = { val props = new Properties - props.put("broker.id", nodeId.toString) - props.put("port", port.toString) - props.put("log.dir", getLogDir.getAbsolutePath) - props.put("log.flush.interval.messages", "1") - props.put("enable.zookeeper", "false") - props.put("num.partitions", "20") - props.put("log.retention.hours", "10") - props.put("log.cleanup.interval.mins", "5") - props.put("log.segment.bytes", logSize.toString) - props.put("zookeeper.connect", zkConnect.toString) + props.put(KafkaConfig.BrokerIdProp, nodeId.toString) + props.put(KafkaConfig.PortProp, port.toString) + props.put(KafkaConfig.LogDirProp, getLogDir.getAbsolutePath) + props.put(KafkaConfig.LogFlushIntervalMessagesProp, "1") + props.put(KafkaConfig.NumPartitionsProp, "20") + props.put(KafkaConfig.LogRetentionHoursProp, "10") + props.put(KafkaConfig.LogCleanupIntervalMinsProp, "5") + props.put(KafkaConfig.LogSegmentBytesProp, logSize.toString) + props.put(ZKConfig.ZkConnectProp, zkConnect.toString) props } diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 67497dd..13f9570 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -52,7 +52,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with super.setUp() val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk) - val logDirZkPath = propsZk.getProperty("log.dir") + val logDirZkPath = propsZk.getProperty(KafkaConfig.LogDirProp) logDirZk = new File(logDirZkPath) config = new KafkaConfig(propsZk) server = TestUtils.createServer(config); diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 1781bc0..c980a8f 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -60,12 +60,12 @@ class AsyncProducerTest extends JUnit3Suite { } val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - props.put("producer.type", "async") - props.put("queue.buffering.max.messages", "10") - props.put("batch.num.messages", "1") - props.put("queue.enqueue.timeout.ms", "0") + props.put(AsyncProducerConfig.SerializerClassProp, "kafka.serializer.StringEncoder") + props.put(ProducerConfig.MetadataBrokerListProp, TestUtils.getBrokerListStrFromConfigs(configs)) + props.put(ProducerConfig.ProducerTypeProp, "async") + props.put(AsyncProducerConfig.QueueBufferingMaxMessagesProp, "10") + props.put(AsyncProducerConfig.BatchNumMessagesProp, "1") + props.put(AsyncProducerConfig.QueueEnqueueTimeoutMsProp, "0") val config = new ProducerConfig(props) val produceData = getProduceData(12) @@ -85,10 +85,10 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testProduceAfterClosed() { val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - props.put("producer.type", "async") - props.put("batch.num.messages", "1") + props.put(AsyncProducerConfig.SerializerClassProp, "kafka.serializer.StringEncoder") + props.put(ProducerConfig.MetadataBrokerListProp, TestUtils.getBrokerListStrFromConfigs(configs)) + props.put(ProducerConfig.ProducerTypeProp, "async") + props.put(AsyncProducerConfig.BatchNumMessagesProp, "1") val config = new ProducerConfig(props) val produceData = getProduceData(10) @@ -165,7 +165,7 @@ class AsyncProducerTest extends JUnit3Suite { producerDataList.append(new KeyedMessage[Int,Message]("topic2", 4, new Message("msg5".getBytes))) val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put(ProducerConfig.MetadataBrokerListProp, TestUtils.getBrokerListStrFromConfigs(configs)) val broker1 = new Broker(0, "localhost", 9092) val broker2 = new Broker(1, "localhost", 9093) broker1 @@ -215,7 +215,7 @@ class AsyncProducerTest extends JUnit3Suite { def testSerializeEvents() { val produceData = TestUtils.getMsgStrings(5).map(m => new KeyedMessage[String,String]("topic1",m)) val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put(ProducerConfig.MetadataBrokerListProp, TestUtils.getBrokerListStrFromConfigs(configs)) val config = new ProducerConfig(props) // form expected partitions metadata val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092) @@ -241,7 +241,7 @@ class AsyncProducerTest extends JUnit3Suite { val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]] producerDataList.append(new KeyedMessage[String,Message]("topic1", "key1", new Message("msg1".getBytes))) val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put(ProducerConfig.MetadataBrokerListProp, TestUtils.getBrokerListStrFromConfigs(configs)) val config = new ProducerConfig(props) // form expected partitions metadata @@ -271,7 +271,7 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testNoBroker() { val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put(ProducerConfig.MetadataBrokerListProp, TestUtils.getBrokerListStrFromConfigs(configs)) val config = new ProducerConfig(props) // create topic metadata with 0 partitions @@ -302,7 +302,7 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testIncompatibleEncoder() { val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put(ProducerConfig.MetadataBrokerListProp, TestUtils.getBrokerListStrFromConfigs(configs)) val config = new ProducerConfig(props) val producer=new Producer[String, String](config) @@ -319,7 +319,7 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testRandomPartitioner() { val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put(ProducerConfig.MetadataBrokerListProp, TestUtils.getBrokerListStrFromConfigs(configs)) val config = new ProducerConfig(props) // create topic metadata with 0 partitions @@ -358,8 +358,8 @@ class AsyncProducerTest extends JUnit3Suite { def testBrokerListAndAsync() { return val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs)) - props.put("producer.type", "async") - props.put("batch.num.messages", "5") + props.put(ProducerConfig.ProducerTypeProp, "async") + props.put(AsyncProducerConfig.BatchNumMessagesProp, "5") val config = new ProducerConfig(props) @@ -393,10 +393,10 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testFailedSendRetryLogic() { val props = new Properties() - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - props.put("request.required.acks", "1") - props.put("serializer.class", classOf[StringEncoder].getName.toString) - props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString) + props.put(ProducerConfig.MetadataBrokerListProp, TestUtils.getBrokerListStrFromConfigs(configs)) + props.put(SyncProducerConfigShared.RequestRequiredAcksProp, "1") + props.put(AsyncProducerConfig.SerializerClassProp, classOf[StringEncoder].getName.toString) + props.put(AsyncProducerConfig.KeySerializerClassProp, classOf[NullEncoder[Int]].getName.toString) val config = new ProducerConfig(props) @@ -469,8 +469,8 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testInvalidConfiguration() { val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("producer.type", "async") + props.put(AsyncProducerConfig.SerializerClassProp, "kafka.serializer.StringEncoder") + props.put(ProducerConfig.ProducerTypeProp, "async") try { new ProducerConfig(props) fail("should complain about wrong config") @@ -495,11 +495,11 @@ class AsyncProducerTest extends JUnit3Suite { val broker1 = new Broker(brokerId, brokerHost, brokerPort) new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1)))) } - + def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = { new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => new Message(m.getBytes)): _*) } - + def messagesToSet(key: Array[Byte], messages: Seq[Array[Byte]]): ByteBufferMessageSet = { new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => new Message(key = key, bytes = m)): _*) } diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index b511d90..40e46c6 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -33,7 +33,7 @@ import kafka.common.FailedToSendMessageException import org.junit.Assert.assertTrue import org.junit.Assert.assertFalse import org.junit.Assert.assertEquals - +import kafka.producer.async.AsyncProducerConfig class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ @@ -66,10 +66,6 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ server2 = TestUtils.createServer(config2) servers = List(server1,server2) - val props = new Properties() - props.put("host", "localhost") - props.put("port", port1.toString) - consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024, "") consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024, "") @@ -98,8 +94,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) val props1 = new util.Properties() - props1.put("metadata.broker.list", "localhost:80,localhost:81") - props1.put("serializer.class", "kafka.serializer.StringEncoder") + props1.put(ProducerConfig.MetadataBrokerListProp, "localhost:80,localhost:81") + props1.put(AsyncProducerConfig.SerializerClassProp, "kafka.serializer.StringEncoder") val producerConfig1 = new ProducerConfig(props1) val producer1 = new Producer[String, String](producerConfig1) try{ @@ -113,8 +109,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ } val props2 = new util.Properties() - props2.put("metadata.broker.list", "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq( config1))) - props2.put("serializer.class", "kafka.serializer.StringEncoder") + props2.put(ProducerConfig.MetadataBrokerListProp, "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq( config1))) + props2.put(AsyncProducerConfig.SerializerClassProp, "kafka.serializer.StringEncoder") val producerConfig2= new ProducerConfig(props2) val producer2 = new Producer[String, String](producerConfig2) try{ @@ -126,8 +122,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ } val props3 = new util.Properties() - props3.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) - props3.put("serializer.class", "kafka.serializer.StringEncoder") + props3.put(ProducerConfig.MetadataBrokerListProp, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props3.put(AsyncProducerConfig.SerializerClassProp, "kafka.serializer.StringEncoder") val producerConfig3 = new ProducerConfig(props3) val producer3 = new Producer[String, String](producerConfig3) try{ @@ -142,16 +138,16 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ @Test def testSendToNewTopic() { val props1 = new util.Properties() - props1.put("serializer.class", "kafka.serializer.StringEncoder") - props1.put("partitioner.class", "kafka.utils.StaticPartitioner") - props1.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) - props1.put("request.required.acks", "2") - props1.put("request.timeout.ms", "1000") + props1.put(AsyncProducerConfig.SerializerClassProp, "kafka.serializer.StringEncoder") + props1.put(ProducerConfig.PartitionerClassProp, "kafka.utils.StaticPartitioner") + props1.put(ProducerConfig.MetadataBrokerListProp, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props1.put(SyncProducerConfigShared.RequestRequiredAcksProp, "2") + props1.put(SyncProducerConfigShared.RequestTimeoutMsProp, "1000") val props2 = new util.Properties() props2.putAll(props1) - props2.put("request.required.acks", "3") - props2.put("request.timeout.ms", "1000") + props2.put(SyncProducerConfigShared.RequestRequiredAcksProp, "3") + props2.put(SyncProducerConfigShared.RequestTimeoutMsProp, "1000") val producerConfig1 = new ProducerConfig(props1) val producerConfig2 = new ProducerConfig(props2) @@ -201,11 +197,11 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ @Test def testSendWithDeadBroker() { val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("partitioner.class", "kafka.utils.StaticPartitioner") - props.put("request.timeout.ms", "2000") - props.put("request.required.acks", "1") - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(AsyncProducerConfig.SerializerClassProp, "kafka.serializer.StringEncoder") + props.put(ProducerConfig.PartitionerClassProp, "kafka.utils.StaticPartitioner") + props.put(SyncProducerConfigShared.RequestTimeoutMsProp, "2000") + props.put(SyncProducerConfigShared.RequestRequiredAcksProp, "1") + props.put(ProducerConfig.MetadataBrokerListProp, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) val topic = "new-topic" // create topic @@ -260,12 +256,12 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ def testAsyncSendCanCorrectlyFailWithTimeout() { val timeoutMs = 500 val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("partitioner.class", "kafka.utils.StaticPartitioner") - props.put("request.timeout.ms", String.valueOf(timeoutMs)) - props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) - props.put("request.required.acks", "1") - props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout") + props.put(AsyncProducerConfig.SerializerClassProp, "kafka.serializer.StringEncoder") + props.put(ProducerConfig.PartitionerClassProp, "kafka.utils.StaticPartitioner") + props.put(SyncProducerConfigShared.RequestTimeoutMsProp, String.valueOf(timeoutMs)) + props.put(ProducerConfig.MetadataBrokerListProp, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(SyncProducerConfigShared.RequestRequiredAcksProp, "1") + props.put(SyncProducerConfigShared.ClientIdProp, "ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout") val config = new ProducerConfig(props) val producer = new Producer[String, String](config) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index db46247..e87c3e8 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -23,7 +23,8 @@ import kafka.utils.TestUtils._ import kafka.utils.IntEncoder import kafka.utils.{Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness -import kafka.producer.{ProducerConfig, KeyedMessage, Producer} +import kafka.producer.{SyncProducerConfigShared, ProducerConfig, KeyedMessage, Producer} +import kafka.producer.async.AsyncProducerConfig class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -47,10 +48,10 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDirs(0)) var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0)) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] - + val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs)) - producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString) - producerProps.put("request.required.acks", "-1") + producerProps.put(AsyncProducerConfig.KeySerializerClassProp, classOf[IntEncoder].getName.toString) + producerProps.put(SyncProducerConfigShared.RequestRequiredAcksProp, "-1") def testHWCheckpointNoFailuresSingleLogSegment { @@ -74,7 +75,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { sendMessages(numMessages.toInt) // give some time for the follower 1 to record leader HW - assertTrue("Failed to update highwatermark for follower after 1000 ms", + assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages, 10000)) @@ -103,7 +104,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertTrue("Leader should get elected", leader.isDefined) // NOTE: this is to avoid transient test failures assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) - + assertEquals(0L, hwFile1.read(topic, 0)) sendMessages(1) @@ -137,7 +138,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { sendMessages(1) hw += 1 - + // give some time for follower 1 to record leader HW of 60 assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 2000)) @@ -205,7 +206,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { sendMessages(2) var hw = 2L - + // allow some time for the follower to get the leader HW assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 1000)) @@ -230,7 +231,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { sendMessages(2) hw += 2 - + // allow some time for the follower to get the leader HW assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw, 1000)) diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 947e795..4382d19 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -29,6 +29,7 @@ import kafka.utils.TestUtils._ import kafka.admin.CreateTopicCommand import kafka.api.FetchRequestBuilder import kafka.utils.{TestUtils, Utils} +import kafka.producer.async.AsyncProducerConfig class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val port = TestUtils.choosePort @@ -45,7 +46,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { var server = new KafkaServer(config) server.startup() val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config))) - producerConfig.put("key.serializer.class", classOf[IntEncoder].getName.toString) + producerConfig.put(AsyncProducerConfig.KeySerializerClassProp, classOf[IntEncoder].getName.toString) var producer = new Producer[Int, String](new ProducerConfig(producerConfig)) // create topic @@ -62,7 +63,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { assertTrue(cleanShutDownFile.exists) } producer.close() - + /* now restart the server and check that the written data is still readable and everything still works */ server = new KafkaServer(config) server.startup() diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index a4dcca6..11aa246 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -38,6 +38,7 @@ import collection.mutable.Map import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.common.TopicAndPartition import junit.framework.Assert +import kafka.producer.async.AsyncProducerConfig /** @@ -124,13 +125,13 @@ object TestUtils extends Logging { */ def createBrokerConfig(nodeId: Int, port: Int): Properties = { val props = new Properties - props.put("broker.id", nodeId.toString) - props.put("host.name", "localhost") - props.put("port", port.toString) - props.put("log.dir", TestUtils.tempDir().getAbsolutePath) - props.put("log.flush.interval.messages", "1") - props.put("zookeeper.connect", TestZKUtils.zookeeperConnect) - props.put("replica.socket.timeout.ms", "1500") + props.put(KafkaConfig.BrokerIdProp, nodeId.toString) + props.put(KafkaConfig.HostNameProp, "localhost") + props.put(KafkaConfig.PortProp, port.toString) + props.put(KafkaConfig.LogDirProp, TestUtils.tempDir().getAbsolutePath) + props.put(KafkaConfig.LogFlushIntervalMessagesProp, "1") + props.put(ZKConfig.ZkConnectProp, TestZKUtils.zookeeperConnect) + props.put(KafkaConfig.ReplicaSocketTimeoutMsProp, "1500") props } @@ -140,15 +141,15 @@ object TestUtils extends Logging { def createConsumerProperties(zkConnect: String, groupId: String, consumerId: String, consumerTimeout: Long = -1): Properties = { val props = new Properties - props.put("zookeeper.connect", zkConnect) - props.put("group.id", groupId) - props.put("consumer.id", consumerId) - props.put("consumer.timeout.ms", consumerTimeout.toString) - props.put("zookeeper.session.timeout.ms", "400") - props.put("zookeeper.sync.time.ms", "200") - props.put("auto.commit.interval.ms", "1000") - props.put("rebalance.max.retries", "4") - props.put("auto.offset.reset", "smallest") + props.put(ZKConfig.ZkConnectProp, zkConnect) + props.put(ConsumerConfig.GroupIdProp, groupId) + props.put(ConsumerConfig.ConsumerIdProp, consumerId) + props.put(ConsumerConfig.ConsumerTimeoutMsProp, consumerTimeout.toString) + props.put(ZKConfig.ZkSessionTimeoutMsProp, "400") + props.put(ZKConfig.ZkSyncTimeMsProp, "200") + props.put(ConsumerConfig.AutoCommitIntervalMsProp, "1000") + props.put(ConsumerConfig.RebalanceMaxRetriesProp, "4") + props.put(ConsumerConfig.AutoOffsetResetProp, OffsetRequest.SmallestTimeString) props } @@ -290,39 +291,37 @@ object TestUtils extends Logging { /** * Create a producer for the given host and port */ - def createProducer[K, V](brokerList: String, - encoder: Encoder[V] = new DefaultEncoder(), + def createProducer[K, V](brokerList: String, + encoder: Encoder[V] = new DefaultEncoder(), keyEncoder: Encoder[K] = new DefaultEncoder()): Producer[K, V] = { val props = new Properties() - props.put("metadata.broker.list", brokerList) - props.put("send.buffer.bytes", "65536") - props.put("connect.timeout.ms", "100000") - props.put("reconnect.interval", "10000") - props.put("serializer.class", encoder.getClass.getCanonicalName) - props.put("key.serializer.class", keyEncoder.getClass.getCanonicalName) + props.put(ProducerConfig.MetadataBrokerListProp, brokerList) + props.put(SyncProducerConfigShared.SendBufferBytesProp, "65536") + props.put(AsyncProducerConfig.SerializerClassProp, encoder.getClass.getCanonicalName) + props.put(AsyncProducerConfig.KeySerializerClassProp, keyEncoder.getClass.getCanonicalName) new Producer[K, V](new ProducerConfig(props)) } def getProducerConfig(brokerList: String, partitioner: String = "kafka.producer.DefaultPartitioner"): Properties = { val props = new Properties() - props.put("metadata.broker.list", brokerList) - props.put("partitioner.class", partitioner) - props.put("message.send.max.retries", "3") - props.put("retry.backoff.ms", "1000") - props.put("request.timeout.ms", "500") - props.put("request.required.acks", "-1") - props.put("serializer.class", classOf[StringEncoder].getName.toString) + props.put(ProducerConfig.MetadataBrokerListProp, brokerList) + props.put(ProducerConfig.PartitionerClassProp, partitioner) + props.put(ProducerConfig.MessageSendMaxRetriesProp, "3") + props.put(ProducerConfig.RetryBackoffMsProp, "1000") + props.put(SyncProducerConfigShared.RequestTimeoutMsProp, "500") + props.put(SyncProducerConfigShared.RequestRequiredAcksProp, "-1") + props.put(AsyncProducerConfig.SerializerClassProp, classOf[StringEncoder].getName.toString) props } def getSyncProducerConfig(port: Int): Properties = { val props = new Properties() - props.put("host", "localhost") - props.put("port", port.toString) - props.put("request.timeout.ms", "500") - props.put("request.required.acks", "1") - props.put("serializer.class", classOf[StringEncoder].getName.toString) + props.put(SyncProducerConfig.HostProp, "localhost") + props.put(SyncProducerConfig.PortProp, port.toString) + props.put(SyncProducerConfigShared.RequestTimeoutMsProp, "500") + props.put(SyncProducerConfigShared.RequestRequiredAcksProp, "1") + props.put(AsyncProducerConfig.SerializerClassProp, classOf[StringEncoder].getName.toString) props } @@ -365,9 +364,9 @@ object TestUtils extends Logging { /** * Create a wired format request based on simple basic information */ - def produceRequest(topic: String, - partition: Int, - message: ByteBufferMessageSet, + def produceRequest(topic: String, + partition: Int, + message: ByteBufferMessageSet, acks: Int = SyncProducerConfig.DefaultRequiredAcks, timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs, correlationId: Int = 0, @@ -375,10 +374,10 @@ object TestUtils extends Logging { produceRequestWithAcks(Seq(topic), Seq(partition), message, acks, timeout, correlationId, clientId) } - def produceRequestWithAcks(topics: Seq[String], - partitions: Seq[Int], - message: ByteBufferMessageSet, - acks: Int = SyncProducerConfig.DefaultRequiredAcks, + def produceRequestWithAcks(topics: Seq[String], + partitions: Seq[Int], + message: ByteBufferMessageSet, + acks: Int = SyncProducerConfig.DefaultRequiredAcks, timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs, correlationId: Int = 0, clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = { @@ -445,7 +444,7 @@ object TestUtils extends Logging { leaderLock.unlock() } } - + /** * Execute the given block. If it throws an assert error, retry. Repeat * until no error is thrown or the time limit ellapses @@ -507,7 +506,7 @@ object TestUtils extends Logging { TestUtils.waitUntilTrue(() => servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition))), timeout)) } - + } object TestZKUtils { diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 63f099a..c701505 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -17,22 +17,23 @@ package kafka.examples; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.Message; +import kafka.utils.ZKConfig; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; public class Consumer extends Thread { private final ConsumerConnector consumer; private final String topic; - + public Consumer(String topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( @@ -43,16 +44,16 @@ public class Consumer extends Thread private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); - props.put("zookeeper.connect", KafkaProperties.zkConnect); - props.put("group.id", KafkaProperties.groupId); - props.put("zookeeper.session.timeout.ms", "400"); - props.put("zookeeper.sync.time.ms", "200"); - props.put("auto.commit.interval.ms", "1000"); + props.put(ZKConfig.ZkConnectProp(), KafkaProperties.zkConnect); + props.put(ConsumerConfig.GroupIdProp(), KafkaProperties.groupId); + props.put(ZKConfig.ZkSessionTimeoutMsProp(), "400"); + props.put(ZKConfig.ZkSyncTimeMsProp(), "200"); + props.put(ConsumerConfig.AutoCommitIntervalMsProp(), "1000"); return new ConsumerConfig(props); } - + public void run() { Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(1)); diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index 96e9893..3b76ee3 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -17,9 +17,11 @@ package kafka.examples; -import java.util.Properties; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; +import kafka.producer.async.AsyncProducerConfig$; + +import java.util.Properties; public class Producer extends Thread { @@ -29,14 +31,14 @@ public class Producer extends Thread public Producer(String topic) { - props.put("serializer.class", "kafka.serializer.StringEncoder"); - props.put("metadata.broker.list", "localhost:9092"); + props.put(AsyncProducerConfig$.MODULE$.SerializerClassProp(), "kafka.serializer.StringEncoder"); + props.put(ProducerConfig.MetadataBrokerListProp(), "localhost:9092"); // Use random partitioner. Don't need the key type. Just set it to Integer. // The message is of type String. producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props)); this.topic = topic; } - + public void run() { int messageNo = 1; while(true) diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala index 3158a22..6f39871 100644 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala @@ -22,10 +22,11 @@ import java.util.concurrent.atomic.AtomicLong import java.nio.channels.ClosedByInterruptException import org.apache.log4j.Logger import kafka.message.Message -import kafka.utils.ZkUtils +import kafka.utils.{ZKConfig, ZkUtils} import java.util.{Random, Properties} import kafka.consumer._ import java.text.SimpleDateFormat +import kafka.api.OffsetRequest /** * Performance test for the full zookeeper consumer @@ -124,12 +125,13 @@ object ConsumerPerformance { } val props = new Properties - props.put("group.id", options.valueOf(groupIdOpt)) - props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) - props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) - props.put("auto.offset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest") - props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) - props.put("consumer.timeout.ms", "5000") + props.put(ConsumerConfig.GroupIdProp, options.valueOf(groupIdOpt)) + props.put(ConsumerConfig.SocketReceiveBufferBytesProp, options.valueOf(socketBufferSizeOpt).toString) + props.put(ConsumerConfig.FetchMessageMaxBytesProp, options.valueOf(fetchSizeOpt).toString) + props.put(ConsumerConfig.AutoOffsetResetProp, if(options.has(resetBeginningOffsetOpt)) + OffsetRequest.LargestTimeString else OffsetRequest.SmallestTimeString) + props.put(ZKConfig.ZkConnectProp, options.valueOf(zkConnectOpt)) + props.put(ConsumerConfig.ConsumerTimeoutMsProp, "5000") val consumerConfig = new ConsumerConfig(props) val numThreads = options.valueOf(numThreadsOpt).intValue val topic = options.valueOf(topicOpt) diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index ad2ac26..1b370f2 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -27,7 +27,8 @@ import kafka.serializer._ import java.util._ import collection.immutable.List import kafka.utils.{VerifiableProperties, Logging} -import kafka.metrics.KafkaMetricsReporter +import kafka.metrics.{KafkaCSVMetricsReporter, KafkaMetricsConfig, KafkaMetricsReporter} +import kafka.producer.async.AsyncProducerConfig /** @@ -153,13 +154,13 @@ object ProducerPerformance extends Logging { if (csvMetricsReporterEnabled) { val props = new Properties() - props.put("kafka.metrics.polling.interval.secs", "1") - props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") + props.put(KafkaMetricsConfig.PollingIntervalSecs, "1") + props.put(KafkaMetricsConfig.Reporters, "kafka.metrics.KafkaCSVMetricsReporter") if (options.has(metricsDirectoryOpt)) - props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt)) + props.put(KafkaCSVMetricsReporter.MetricsDir, options.valueOf(metricsDirectoryOpt)) else - props.put("kafka.csv.metrics.dir", "kafka_metrics") - props.put("kafka.csv.metrics.reporter.enabled", "true") + props.put(KafkaCSVMetricsReporter.MetricsDir, "kafka_metrics") + props.put(KafkaCSVMetricsReporter.ReporterEnabled, "true") val verifiableProps = new VerifiableProperties(props) KafkaMetricsReporter.startReporters(verifiableProps) } @@ -174,24 +175,23 @@ object ProducerPerformance extends Logging { val allDone: CountDownLatch, val rand: Random) extends Runnable { val props = new Properties() - props.put("metadata.broker.list", config.brokerList) - props.put("compression.codec", config.compressionCodec.codec.toString) - props.put("reconnect.interval", Integer.MAX_VALUE.toString) - props.put("send.buffer.bytes", (64*1024).toString) + props.put(ProducerConfig.MetadataBrokerListProp, config.brokerList) + props.put(ProducerConfig.CompressionCodecProp, config.compressionCodec.codec.toString) + props.put(SyncProducerConfigShared.SendBufferBytesProp, (64*1024).toString) if(!config.isSync) { - props.put("producer.type","async") - props.put("batch.num.messages", config.batchSize.toString) - props.put("queue.enqueue.timeout.ms", "-1") + props.put(ProducerConfig.ProducerTypeProp, "async") + props.put(AsyncProducerConfig.BatchNumMessagesProp, config.batchSize.toString) + props.put(AsyncProducerConfig.QueueEnqueueTimeoutMsProp, "-1") } - props.put("client.id", "ProducerPerformance") - props.put("request.required.acks", config.producerRequestRequiredAcks.toString) - props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) - props.put("message.send.max.retries", config.producerNumRetries.toString) - props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) - props.put("serializer.class", classOf[DefaultEncoder].getName.toString) - props.put("key.serializer.class", classOf[NullEncoder[Long]].getName.toString) + props.put(SyncProducerConfigShared.ClientIdProp, "ProducerPerformance") + props.put(SyncProducerConfigShared.RequestRequiredAcksProp, config.producerRequestRequiredAcks.toString) + props.put(SyncProducerConfigShared.RequestTimeoutMsProp, config.producerRequestTimeoutMs.toString) + props.put(ProducerConfig.MessageSendMaxRetriesProp, config.producerNumRetries.toString) + props.put(ProducerConfig.RetryBackoffMsProp, config.producerRetryBackoffMs.toString) + props.put(AsyncProducerConfig.SerializerClassProp, classOf[DefaultEncoder].getName.toString) + props.put(AsyncProducerConfig.KeySerializerClassProp, classOf[NullEncoder[Long]].getName.toString) + - val producerConfig = new ProducerConfig(props) val producer = new Producer[Long, Array[Byte]](producerConfig) val seqIdNumDigit = 10 // no. of digits for max int value diff --git a/system_test/broker_failure/config/server_source1.properties b/system_test/broker_failure/config/server_source1.properties index bbf288e..dc86d6b 100644 --- a/system_test/broker_failure/config/server_source1.properties +++ b/system_test/broker_failure/config/server_source1.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,7 +34,7 @@ num.threads=8 # the directory in which to store log files log.dir=/tmp/kafka-source1-logs -# the send buffer used by the socket server +# the send buffer used by the socket server socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server @@ -54,9 +54,6 @@ log.flush.interval.messages=600 #set the following properties to use zookeeper -# enable connecting to zookeeper -enable.zookeeper=true - # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" diff --git a/system_test/broker_failure/config/server_source2.properties b/system_test/broker_failure/config/server_source2.properties index 570bafc..eb11e02 100644 --- a/system_test/broker_failure/config/server_source2.properties +++ b/system_test/broker_failure/config/server_source2.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,7 +34,7 @@ num.threads=8 # the directory in which to store log files log.dir=/tmp/kafka-source2-logs -# the send buffer used by the socket server +# the send buffer used by the socket server socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server @@ -54,9 +54,6 @@ log.flush.interval.messages=600 #set the following properties to use zookeeper -# enable connecting to zookeeper -enable.zookeeper=true - # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" diff --git a/system_test/broker_failure/config/server_source3.properties b/system_test/broker_failure/config/server_source3.properties index df8ff6a..2fba7aa 100644 --- a/system_test/broker_failure/config/server_source3.properties +++ b/system_test/broker_failure/config/server_source3.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,7 +34,7 @@ num.threads=8 # the directory in which to store log files log.dir=/tmp/kafka-source3-logs -# the send buffer used by the socket server +# the send buffer used by the socket server socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server @@ -54,9 +54,6 @@ log.flush.interval.messages=600 #set the following properties to use zookeeper -# enable connecting to zookeeper -enable.zookeeper=true - # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" diff --git a/system_test/broker_failure/config/server_source4.properties b/system_test/broker_failure/config/server_source4.properties index ee9c7fd..5951f09 100644 --- a/system_test/broker_failure/config/server_source4.properties +++ b/system_test/broker_failure/config/server_source4.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,7 +34,7 @@ num.threads=8 # the directory in which to store log files log.dir=/tmp/kafka-source4-logs -# the send buffer used by the socket server +# the send buffer used by the socket server socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server @@ -54,9 +54,6 @@ log.flush.interval.messages=600 #set the following properties to use zookeeper -# enable connecting to zookeeper -enable.zookeeper=true - # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" diff --git a/system_test/broker_failure/config/server_target1.properties b/system_test/broker_failure/config/server_target1.properties index 7f776bd..2f001af 100644 --- a/system_test/broker_failure/config/server_target1.properties +++ b/system_test/broker_failure/config/server_target1.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,7 +34,7 @@ num.threads=8 # the directory in which to store log files log.dir=/tmp/kafka-target1-logs -# the send buffer used by the socket server +# the send buffer used by the socket server socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server @@ -54,9 +54,6 @@ log.flush.interval.messages=600 #set the following properties to use zookeeper -# enable connecting to zookeeper -enable.zookeeper=true - # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" diff --git a/system_test/broker_failure/config/server_target2.properties b/system_test/broker_failure/config/server_target2.properties index 6d997dc..25e445f 100644 --- a/system_test/broker_failure/config/server_target2.properties +++ b/system_test/broker_failure/config/server_target2.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,7 +34,7 @@ num.threads=8 # the directory in which to store log files log.dir=/tmp/kafka-target2-logs -# the send buffer used by the socket server +# the send buffer used by the socket server socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server @@ -54,9 +54,6 @@ log.flush.interval.messages=600 #set the following properties to use zookeeper -# enable connecting to zookeeper -enable.zookeeper=true - # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" diff --git a/system_test/broker_failure/config/server_target3.properties b/system_test/broker_failure/config/server_target3.properties index 0d3a9ae..1d6bc12 100644 --- a/system_test/broker_failure/config/server_target3.properties +++ b/system_test/broker_failure/config/server_target3.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,7 +34,7 @@ num.threads=8 # the directory in which to store log files log.dir=/tmp/kafka-target3-logs -# the send buffer used by the socket server +# the send buffer used by the socket server socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server @@ -54,9 +54,6 @@ log.flush.interval.messages=600 #set the following properties to use zookeeper -# enable connecting to zookeeper -enable.zookeeper=true - # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" diff --git a/system_test/migration_tool_testsuite/config/server.properties b/system_test/migration_tool_testsuite/config/server.properties index 6ecbb71..4609578 100644 --- a/system_test/migration_tool_testsuite/config/server.properties +++ b/system_test/migration_tool_testsuite/config/server.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -32,7 +32,7 @@ port=9091 # The number of threads handling network requests num.network.threads=2 - + # The number of threads doing disk I/O num.io.threads=2 @@ -65,7 +65,7 @@ num.partitions=5 # There are a few important trade-offs here: # 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. # 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). -# 3. Throughput: The flush is generally the most expensive operation. +# 3. Throughput: The flush is generally the most expensive operation. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. @@ -100,15 +100,12 @@ log.retention.hours=168 #log.segment.bytes=102400 log.segment.bytes=128 -# The interval at which log segments are checked to see if they can be deleted according +# The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.cleanup.interval.mins=1 ############################# Zookeeper ############################# -# Enable connecting to zookeeper -enable.zookeeper=true - # Zk connection string (see zk docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". diff --git a/system_test/mirror_maker/config/blacklisttest.consumer.properties b/system_test/mirror_maker/config/blacklisttest.consumer.properties index ff12015..550c8b1 100644 --- a/system_test/mirror_maker/config/blacklisttest.consumer.properties +++ b/system_test/mirror_maker/config/blacklisttest.consumer.properties @@ -24,5 +24,5 @@ zk.connection.timeout.ms=1000000 #consumer group id group.id=group1 -shallow.iterator.enable=true + diff --git a/system_test/mirror_maker/config/server_source_1_1.properties b/system_test/mirror_maker/config/server_source_1_1.properties index 2f070a7..6bc49e4 100644 --- a/system_test/mirror_maker/config/server_source_1_1.properties +++ b/system_test/mirror_maker/config/server_source_1_1.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,7 +34,7 @@ num.threads=8 # the directory in which to store log files log.dir=/tmp/kafka-source-1-1-logs -# the send buffer used by the socket server +# the send buffer used by the socket server socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server @@ -54,9 +54,6 @@ log.flush.interval.messages=600 #set the following properties to use zookeeper -# enable connecting to zookeeper -enable.zookeeper=true - # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" diff --git a/system_test/mirror_maker/config/server_source_1_2.properties b/system_test/mirror_maker/config/server_source_1_2.properties index f9353e8..5f6945b 100644 --- a/system_test/mirror_maker/config/server_source_1_2.properties +++ b/system_test/mirror_maker/config/server_source_1_2.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,7 +34,7 @@ num.threads=8 # the directory in which to store log files log.dir=/tmp/kafka-source-1-2-logs -# the send buffer used by the socket server +# the send buffer used by the socket server socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server @@ -54,9 +54,6 @@ log.flush.interval.messages=600 #set the following properties to use zookeeper -# enable connecting to zookeeper -enable.zookeeper=true - # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" diff --git a/system_test/mirror_maker/config/server_source_2_1.properties b/system_test/mirror_maker/config/server_source_2_1.properties index daa01ad..4e1614d 100644 --- a/system_test/mirror_maker/config/server_source_2_1.properties +++ b/system_test/mirror_maker/config/server_source_2_1.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,7 +34,7 @@ num.threads=8 # the directory in which to store log files log.dir=/tmp/kafka-source-2-1-logs -# the send buffer used by the socket server +# the send buffer used by the socket server socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server @@ -54,9 +54,6 @@ log.flush.interval.messages=600 #set the following properties to use zookeeper -# enable connecting to zookeeper -enable.zookeeper=true - # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" diff --git a/system_test/mirror_maker/config/server_source_2_2.properties b/system_test/mirror_maker/config/server_source_2_2.properties index be6fdfc..7630e51 100644 --- a/system_test/mirror_maker/config/server_source_2_2.properties +++ b/system_test/mirror_maker/config/server_source_2_2.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,7 +34,7 @@ num.threads=8 # the directory in which to store log files log.dir=/tmp/kafka-source-2-2-logs -# the send buffer used by the socket server +# the send buffer used by the socket server socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server @@ -54,9 +54,6 @@ log.flush.interval.messages=600 #set the following properties to use zookeeper -# enable connecting to zookeeper -enable.zookeeper=true - # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" diff --git a/system_test/mirror_maker/config/server_target_1_1.properties b/system_test/mirror_maker/config/server_target_1_1.properties index d37955a..63c2483 100644 --- a/system_test/mirror_maker/config/server_target_1_1.properties +++ b/system_test/mirror_maker/config/server_target_1_1.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,7 +34,7 @@ num.threads=8 # the directory in which to store log files log.dir=/tmp/kafka-target-1-1-logs -# the send buffer used by the socket server +# the send buffer used by the socket server socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server @@ -54,9 +54,6 @@ log.flush.interval.messages=600 #set the following properties to use zookeeper -# enable connecting to zookeeper -enable.zookeeper=true - # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" diff --git a/system_test/mirror_maker/config/server_target_1_2.properties b/system_test/mirror_maker/config/server_target_1_2.properties index aa7546c..5fe33bb 100644 --- a/system_test/mirror_maker/config/server_target_1_2.properties +++ b/system_test/mirror_maker/config/server_target_1_2.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,7 +34,7 @@ num.threads=8 # the directory in which to store log files log.dir=/tmp/kafka-target-1-2-logs -# the send buffer used by the socket server +# the send buffer used by the socket server socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server @@ -54,9 +54,6 @@ log.flush.interval.messages=600 #set the following properties to use zookeeper -# enable connecting to zookeeper -enable.zookeeper=true - # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" diff --git a/system_test/mirror_maker/config/whitelisttest_1.consumer.properties b/system_test/mirror_maker/config/whitelisttest_1.consumer.properties index ff12015..550c8b1 100644 --- a/system_test/mirror_maker/config/whitelisttest_1.consumer.properties +++ b/system_test/mirror_maker/config/whitelisttest_1.consumer.properties @@ -24,5 +24,5 @@ zk.connection.timeout.ms=1000000 #consumer group id group.id=group1 -shallow.iterator.enable=true + diff --git a/system_test/mirror_maker/config/whitelisttest_2.consumer.properties b/system_test/mirror_maker/config/whitelisttest_2.consumer.properties index f1a902b..ee36fad 100644 --- a/system_test/mirror_maker/config/whitelisttest_2.consumer.properties +++ b/system_test/mirror_maker/config/whitelisttest_2.consumer.properties @@ -24,5 +24,5 @@ zk.connection.timeout.ms=1000000 #consumer group id group.id=group1 -shallow.iterator.enable=true + diff --git a/system_test/mirror_maker_testsuite/config/mirror_consumer.properties b/system_test/mirror_maker_testsuite/config/mirror_consumer.properties index e90634a..3a7b045 100644 --- a/system_test/mirror_maker_testsuite/config/mirror_consumer.properties +++ b/system_test/mirror_maker_testsuite/config/mirror_consumer.properties @@ -9,4 +9,4 @@ auto.offset.reset=smallest socket.receive.buffer.bytes=1048576 fetch.message.max.bytes=1048576 zookeeper.sync.time.ms=15000 -shallow.iterator.enable=false + diff --git a/system_test/mirror_maker_testsuite/config/server.properties b/system_test/mirror_maker_testsuite/config/server.properties index 36dd68d..a52b277 100644 --- a/system_test/mirror_maker_testsuite/config/server.properties +++ b/system_test/mirror_maker_testsuite/config/server.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -32,7 +32,7 @@ port=9091 # The number of threads handling network requests num.network.threads=2 - + # The number of threads doing disk I/O num.io.threads=2 @@ -65,7 +65,7 @@ num.partitions=5 # There are a few important trade-offs here: # 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. # 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). -# 3. Throughput: The flush is generally the most expensive operation. +# 3. Throughput: The flush is generally the most expensive operation. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. @@ -100,15 +100,12 @@ log.retention.bytes=-1 #log.segment.size=536870912 log.segment.bytes=102400 -# The interval at which log segments are checked to see if they can be deleted according +# The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.cleanup.interval.mins=1 ############################# Zookeeper ############################# -# Enable connecting to zookeeper -enable.zookeeper=true - # Zk connection string (see zk docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". diff --git a/system_test/producer_perf/config/server.properties b/system_test/producer_perf/config/server.properties index 9f8a633..51dccc3 100644 --- a/system_test/producer_perf/config/server.properties +++ b/system_test/producer_perf/config/server.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,7 +34,7 @@ num.threads=8 # the directory in which to store log files log.dir=/tmp/kafka-logs -# the send buffer used by the socket server +# the send buffer used by the socket server socket.send.buffer.bytes=1048576 # the receive buffer used by the socket server @@ -54,9 +54,6 @@ log.flush.interval.messages=600 #set the following properties to use zookeeper -# enable connecting to zookeeper -enable.zookeeper=true - # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" diff --git a/system_test/replication_testsuite/config/server.properties b/system_test/replication_testsuite/config/server.properties index 36dd68d..a52b277 100644 --- a/system_test/replication_testsuite/config/server.properties +++ b/system_test/replication_testsuite/config/server.properties @@ -4,9 +4,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -32,7 +32,7 @@ port=9091 # The number of threads handling network requests num.network.threads=2 - + # The number of threads doing disk I/O num.io.threads=2 @@ -65,7 +65,7 @@ num.partitions=5 # There are a few important trade-offs here: # 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. # 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). -# 3. Throughput: The flush is generally the most expensive operation. +# 3. Throughput: The flush is generally the most expensive operation. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. @@ -100,15 +100,12 @@ log.retention.bytes=-1 #log.segment.size=536870912 log.segment.bytes=102400 -# The interval at which log segments are checked to see if they can be deleted according +# The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.cleanup.interval.mins=1 ############################# Zookeeper ############################# -# Enable connecting to zookeeper -enable.zookeeper=true - # Zk connection string (see zk docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".