diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index a6423f4..e6eb5b0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -95,30 +95,28 @@ public class KafkaProducer implements Producer { private KafkaProducer(ProducerConfig config) { log.trace("Starting the Kafka producer"); Time time = new SystemTime(); - MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) - .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES)) + .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS), TimeUnit.MILLISECONDS); String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); String jmxPrefix = "kafka.producer." + (clientId.length() > 0 ? clientId + "." : ""); - List reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); - reporters.add(new JmxReporter(jmxPrefix)); + List reporters = Collections.singletonList((MetricsReporter) new JmxReporter(jmxPrefix)); this.metrics = new Metrics(metricConfig, reporters, time); this.partitioner = new Partitioner(); - long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); - this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); + this.metadata = new Metadata(config.getLong(ProducerConfig.METADATA_FETCH_BACKOFF_CONFIG), + config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); - this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); + this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); - this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), + this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG), this.totalMemorySize, config.getLong(ProducerConfig.LINGER_MS_CONFIG), - retryBackoffMs, + config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, time); - List addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + List addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); this.sender = new Sender(new Selector(this.metrics, time), this.metadata, @@ -126,9 +124,9 @@ public class KafkaProducer implements Producer { clientId, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), - (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), - config.getInt(ProducerConfig.RETRIES_CONFIG), - config.getInt(ProducerConfig.TIMEOUT_CONFIG), + (short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG), + config.getInt(ProducerConfig.MAX_RETRIES_CONFIG), + config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), this.metrics, @@ -142,21 +140,13 @@ public class KafkaProducer implements Producer { log.debug("Kafka producer started"); } - private static int parseAcks(String acksString) { - try { - return acksString.trim().toLowerCase().equals("all") ? -1 : Integer.parseInt(acksString.trim()); - } catch (NumberFormatException e) { - throw new ConfigException("Invalid configuration value for 'acks': " + acksString); - } - } - private static List parseAndValidateAddresses(List urls) { List addresses = new ArrayList(); for (String url : urls) { if (url != null && url.length() > 0) { String[] pieces = url.split(":"); if (pieces.length != 2) - throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); + throw new ConfigException("Invalid url in " + ProducerConfig.BROKER_LIST_CONFIG + ": " + url); try { InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1])); if (address.isUnresolved()) @@ -278,7 +268,7 @@ public class KafkaProducer implements Producer { if (size > this.totalMemorySize) throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the total memory buffer you have configured with the " + - ProducerConfig.BUFFER_MEMORY_CONFIG + + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG + " configuration."); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index bc4074e..259c14b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -19,198 +19,171 @@ import java.util.Map; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; /** - * Configuration for the Kafka Producer. Documentation for these configurations can be found in the Kafka documentation + * The producer configuration keys */ public class ProducerConfig extends AbstractConfig { - /* - * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS THESE ARE PART OF THE PUBLIC API AND - * CHANGE WILL BREAK USER CODE. - */ - private static final ConfigDef config; - /** bootstrap.servers */ - public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; - private static final String BOOSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Data will be load " + "balanced over all servers irrespective of which servers are specified here for bootstrapping—this list only " - + "impacts the initial hosts used to discover the full set of servers. This list should be in the form " - + "host1:port1,host2:port2,.... Since these servers are just used for the initial connection to " - + "discover the full cluster membership (which may change dynamically), this list need not contain the full set of " - + "servers (you may want more than one, though, in case a server is down). If no server in this list is available sending " - + "data will fail until on becomes available."; + /** + * A list of URLs to use for establishing the initial connection to the cluster. This list should be in the form + * host1:port1,host2:port2,.... These urls are just used for the initial connection to discover the + * full cluster membership (which may change dynamically) so this list need not contain the full set of servers (you + * may want more than one, though, in case a server is down). + */ + public static final String BROKER_LIST_CONFIG = "metadata.broker.list"; - /** metadata.fetch.timeout.ms */ + /** + * The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that + * topic. + */ public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; - private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the " + "topic's partitions. This configuration controls the maximum amount of time we will block waiting for the metadata " - + "fetch to succeed before throwing an exception back to the client."; - - /** metadata.max.age.ms */ - public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms"; - private static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any " + " partition leadership changes to proactively discover any new brokers or partitions."; - - /** batch.size */ - public static final String BATCH_SIZE_CONFIG = "batch.size"; - private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent" + " to the same partition. This helps performance on both the client and the server. This configuration controls the " - + "default batch size in bytes. " - + "

" - + "No attempt will be made to batch records larger than this size. " - + "

" - + "Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. " - + "

" - + "A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable " - + "batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a " - + "buffer of the specified batch size in anticipation of additional records."; - - /** buffer.memory */ - public static final String BUFFER_MEMORY_CONFIG = "buffer.memory"; - private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are " + "sent faster than they can be delivered to the server the producer will either block or throw an exception based " - + "on the preference specified by block.on.buffer.full. " - + "

" - + "This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since " - + "not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if " - + "compression is enabled) as well as for maintaining in-flight requests."; - - /** acks */ - public static final String ACKS_CONFIG = "acks"; - private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the " + " durability of records that are sent. The following settings are common: " - + "

    " - + "
  • acks=0 If set to zero then the producer will not wait for any acknowledgment from the" - + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be" - + " made that the server has received the record in this case, and the retries configuration will not" - + " take effect (as the client won't generally know of any failures). The offset given back for each record will" - + " always be set to -1." - + "
  • acks=1 This will mean the leader will write the record to its local log but will respond" - + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after" - + " acknowledging the record but before the followers have replicated it then the record will be lost." - + "
  • acks=all This means the leader will wait for the full set of in-sync replicas to" - + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica" - + " remains alive. This is the strongest available guarantee." - + "
  • Other settings such as acks=2 are also possible, and will require the given number of" - + " acknowledgements but this is generally less useful."; - - /** timeout.ms */ - public static final String TIMEOUT_CONFIG = "timeout.ms"; - private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to " + "meet the acknowledgment requirements the producer has specified with the acks configuration. If the " - + "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout " - + "is measured on the server side and does not include the network latency of the request."; - - /** linger.ms */ + + /** + * The minimum amount of time between metadata fetches. This prevents polling for metadata too quickly. + */ + public static final String METADATA_FETCH_BACKOFF_CONFIG = "metadata.fetch.backoff.ms"; + + /** + * The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any + * leadership changes. + */ + public static final String METADATA_EXPIRY_CONFIG = "metadata.expiry.ms"; + + /** + * The buffer size allocated for a partition. When records are received which are smaller than this size the + * producer will attempt to optimistically group them together until this size is reached. + */ + public static final String MAX_PARTITION_SIZE_CONFIG = "max.partition.bytes"; + + /** + * The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent + * faster than they can be delivered to the server the producer will either block or throw an exception based on the + * preference specified by {@link #BLOCK_ON_BUFFER_FULL_CONFIG}. + */ + public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes"; + + /** + * The number of acknowledgments the producer requires from the server before considering a request complete. + */ + public static final String REQUIRED_ACKS_CONFIG = "request.required.acks"; + + /** + * The maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment + * requirements the producer has specified. If the requested number of acknowledgments are not met an error will be + * returned. + */ + public static final String REQUEST_TIMEOUT_CONFIG = "request.timeout.ms"; + + /** + * The producer groups together any records that arrive in between request sends. Normally this occurs only under + * load when records arrive faster than they can be sent out. However the client can reduce the number of requests + * and increase throughput by adding a small amount of artificial delay to force more records to batch together. + * This setting gives an upper bound on this delay. If we get {@link #MAX_PARTITION_SIZE_CONFIG} worth of records + * for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many + * bytes accumulated for this partition we will "linger" for the specified time waiting for more records to show up. + * This setting defaults to 0. + */ public static final String LINGER_MS_CONFIG = "linger.ms"; - private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. " + "Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to " - + "reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount " - + "of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to " - + "the given delay to allow other records to be sent so that the sends can be batched together. This can be thought " - + "of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once " - + "we get batch.size worth of records for a partition it will be sent immediately regardless of this " - + "setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the " - + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, " - + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load."; - - /** client.id */ + + /** + * The id string to pass to the server when making requests. The purpose of this is to be able to track the source + * of requests beyond just ip/port by allowing a logical application name to be included. + */ public static final String CLIENT_ID_CONFIG = "client.id"; - private static final String CLIENT_ID_DOC = "The id string to pass to the server when making requests. The purpose of this is to be able to track the source " + "of requests beyond just ip/port by allowing a logical application name to be included with the request. The " - + "application can set any string it wants as this has no functional purpose other than in logging and metrics."; - /** send.buffer.bytes */ + /** + * The size of the TCP send buffer to use when sending data + */ public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes"; - private static final String SEND_BUFFER_DOC = "The size of the TCP send buffer to use when sending data"; - /** receive.buffer.bytes */ + /** + * The size of the TCP receive buffer to use when reading data (you generally shouldn't need to change this) + */ public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes"; - private static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer to use when reading data"; - /** max.request.size */ + /** + * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server + * has its own cap on record size which may be different from this. + */ public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size"; - private static final String MAX_REQUEST_SIZE_DOC = "The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server " + "has its own cap on record size which may be different from this. This setting will limit the number of record " - + "batches the producer will send in a single request to avoid sending huge requests."; - /** reconnect.backoff.ms */ + /** + * The amount of time to wait before attempting to reconnect to a given host. This avoids repeated connecting to a + * host in a tight loop. + */ public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms"; - private static final String RECONNECT_BACKOFF_MS_DOC = "The amount of time to wait before attempting to reconnect to a given host when a connection fails." + " This avoids a scenario where the client repeatedly attempts to connect to a host in a tight loop."; - /** block.on.buffer.full */ + /** + * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default + * this setting is true and we block, however users who want to guarantee we never block can turn this into an + * error. + */ public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full"; - private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default " + "this setting is true and we block, however in some scenarios blocking is not desirable and it is better to " - + "immediately give an error. Setting this to false will accomplish that: the producer will throw a BufferExhaustedException if a recrord is sent and the buffer space is full."; - /** retries */ - public static final String RETRIES_CONFIG = "retries"; - private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error." + " Note that this retry is no different than if the client resent the record upon receiving the " - + "error. Allowing retries will potentially change the ordering of records because if two records are " - + "sent to a single partition, and the first fails and is retried but the second succeeds, then the second record " - + "may appear first."; + /** + * The maximum number of times to attempt resending the request before giving up. + */ + public static final String MAX_RETRIES_CONFIG = "request.retries"; - /** retry.backoff.ms */ + /** + * The amount of time to wait before attempting to resend produce request to a given topic partition. This avoids + * repeated sending-and-failing in a tight loop + */ public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; - private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed produce request to a given topic partition." + " This avoids repeated sending-and-failing in a tight loop."; - /** compression.type */ + /** + * The compression type for all data generated. The default is none (i.e. no compression) + */ public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; - private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are none, gzip, or snappy. Compression is of full batches of data, " - + " so the efficacy of batching will also impact the compression ratio (more batching means better compression)."; - /** metrics.sample.window.ms */ - public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms"; - private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. " - + "When a window expires we erase and overwrite the oldest window."; + /** + * The window size for a single metrics sample in ms. Defaults to 30 seconds. + */ + public static final String METRICS_SAMPLE_WINDOW_MS = "metrics.sample.window.ms"; - /** metrics.num.samples */ - public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples"; - private static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics."; + /** + * The number of samples used when reporting metrics. Defaults to two. So by default we use two 30 second windows, + * so metrics are computed over up to 60 seconds. + */ + public static final String METRICS_NUM_SAMPLES = "metrics.num.samples"; - /** metric.reporters */ - public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; - private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; + /** + * Should we register the Kafka metrics as JMX mbeans? + */ + public static final String ENABLE_JMX_CONFIG = "enable.jmx"; static { - config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC) - .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) - .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC) - .define(ACKS_CONFIG, Type.STRING, "1", Importance.HIGH, ACKS_DOC) - .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) - .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) - .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC) - .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC) - .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC) - .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, SEND_BUFFER_DOC) - .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, RECEIVE_BUFFER_DOC) - .define(MAX_REQUEST_SIZE_CONFIG, - Type.INT, - 1 * 1024 * 1024, - atLeast(0), - Importance.MEDIUM, - MAX_REQUEST_SIZE_DOC) - .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) - .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, RECONNECT_BACKOFF_MS_DOC) - .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC) - .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, RETRY_BACKOFF_MS_DOC) - .define(METADATA_FETCH_TIMEOUT_CONFIG, - Type.LONG, - 60 * 1000, - atLeast(0), - Importance.LOW, - METADATA_FETCH_TIMEOUT_DOC) - .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC) - .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, - Type.LONG, - 30000, - atLeast(0), - Importance.LOW, - METRICS_SAMPLE_WINDOW_MS_DOC) - .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC); + /* TODO: add docs */ + config = new ConfigDef().define(BROKER_LIST_CONFIG, Type.LIST, "blah blah") + .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), "blah blah") + .define(METADATA_FETCH_BACKOFF_CONFIG, Type.LONG, 50, atLeast(0), "blah blah") + .define(METADATA_EXPIRY_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), "blah blah") + .define(MAX_PARTITION_SIZE_CONFIG, Type.INT, 16384, atLeast(0), "blah blah") + .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), "blah blah") + /* TODO: should be a string to handle acks=in-sync */ + .define(REQUIRED_ACKS_CONFIG, Type.INT, 1, between(-1, Short.MAX_VALUE), "blah blah") + .define(REQUEST_TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), "blah blah") + .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), "blah blah") + .define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah") + .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah") + .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), "blah blah") + .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah") + .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah") + .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah") + .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "") + .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah") + .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", "blah blah") + .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "") + .define(METRICS_SAMPLE_WINDOW_MS, Type.LONG, 30000, atLeast(0), "") + .define(METRICS_NUM_SAMPLES, Type.INT, 2, atLeast(1), ""); } ProducerConfig(Map props) { super(config, props); } - public static void main(String[] args) { - System.out.println(config.toHtmlTable()); - } - } diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index eb18739..3b3fb2c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -36,12 +36,12 @@ public class ProducerPerformance { int recordSize = Integer.parseInt(args[3]); int acks = Integer.parseInt(args[4]); Properties props = new Properties(); - props.setProperty(ProducerConfig.ACKS_CONFIG, Integer.toString(acks)); - props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url); + props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, Integer.toString(acks)); + props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url); props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000)); - props.setProperty(ProducerConfig.TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE)); - props.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024 * 1024)); - props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(64 * 1024)); + props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE)); + props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024 * 1024)); + props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(64 * 1024)); if (args.length == 6) props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]); diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 8d88610..84a327e 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -12,7 +12,6 @@ */ package org.apache.kafka.common.config; -import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -112,9 +111,6 @@ public class AbstractConfig { log.info(b.toString()); } - /** - * Log warnings for any unused configurations - */ public void logUnused() { for (String key : unused()) log.warn("The configuration {} = {} was supplied but isn't a known config.", key, this.values.get(key)); @@ -140,21 +136,4 @@ public class AbstractConfig { return t.cast(o); } - public List getConfiguredInstances(String key, Class t) { - List klasses = getList(key); - List objects = new ArrayList(); - for (String klass : klasses) { - Class c = getClass(klass); - if (c == null) - return null; - Object o = Utils.newInstance(c); - if (!t.isInstance(o)) - throw new KafkaException(c.getName() + " is not an instance of " + t.getName()); - if (o instanceof Configurable) - ((Configurable) o).configure(this.originals); - objects.add(t.cast(o)); - } - return objects; - } - } diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 9ba7ee7..67b349d 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -1,21 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.kafka.common.config; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -44,7 +45,7 @@ import java.util.Map; */ public class ConfigDef { - private static final Object NO_DEFAULT_VALUE = new String(""); + private static final Object NO_DEFAULT_VALUE = new Object(); private final Map configKeys = new HashMap(); @@ -54,15 +55,14 @@ public class ConfigDef { * @param type The type of the config * @param defaultValue The default value to use if this config isn't present * @param validator A validator to use in checking the correctness of the config - * @param importance The importance of this config: is this something you will likely need to change. * @param documentation The documentation string for the config * @return This ConfigDef so you can chain calls */ - public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) { + public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, String documentation) { if (configKeys.containsKey(name)) throw new ConfigException("Configuration " + name + " is defined twice."); Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type); - configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation)); + configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, documentation)); return this; } @@ -71,12 +71,11 @@ public class ConfigDef { * @param name The name of the config parameter * @param type The type of the config * @param defaultValue The default value to use if this config isn't present - * @param importance The importance of this config: is this something you will likely need to change. * @param documentation The documentation string for the config * @return This ConfigDef so you can chain calls */ - public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation) { - return define(name, type, defaultValue, null, importance, documentation); + public ConfigDef define(String name, Type type, Object defaultValue, String documentation) { + return define(name, type, defaultValue, null, documentation); } /** @@ -84,24 +83,22 @@ public class ConfigDef { * @param name The name of the config parameter * @param type The type of the config * @param validator A validator to use in checking the correctness of the config - * @param importance The importance of this config: is this something you will likely need to change. * @param documentation The documentation string for the config * @return This ConfigDef so you can chain calls */ - public ConfigDef define(String name, Type type, Validator validator, Importance importance, String documentation) { - return define(name, type, NO_DEFAULT_VALUE, validator, importance, documentation); + public ConfigDef define(String name, Type type, Validator validator, String documentation) { + return define(name, type, NO_DEFAULT_VALUE, validator, documentation); } /** * Define a required parameter with no default value and no special validation logic * @param name The name of the config parameter * @param type The type of the config - * @param importance The importance of this config: is this something you will likely need to change. * @param documentation The documentation string for the config * @return This ConfigDef so you can chain calls */ - public ConfigDef define(String name, Type type, Importance importance, String documentation) { - return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation); + public ConfigDef define(String name, Type type, String documentation) { + return define(name, type, NO_DEFAULT_VALUE, null, documentation); } /** @@ -209,10 +206,6 @@ public class ConfigDef { BOOLEAN, STRING, INT, LONG, DOUBLE, LIST, CLASS; } - public enum Importance { - HIGH, MEDIUM, LOW - } - /** * Validation logic the user may provide */ @@ -237,7 +230,7 @@ public class ConfigDef { * @param min The minimum acceptable value */ public static Range atLeast(Number min) { - return new Range(min, null); + return new Range(min, Double.MAX_VALUE); } /** @@ -249,19 +242,8 @@ public class ConfigDef { public void ensureValid(String name, Object o) { Number n = (Number) o; - if (min != null && n.doubleValue() < min.doubleValue()) - throw new ConfigException(name, o, "Value must be at least " + min); - if (max != null && n.doubleValue() > max.doubleValue()) - throw new ConfigException(name, o, "Value must be no more than " + max); - } - - public String toString() { - if (min == null) - return "[...," + max + "]"; - else if (max == null) - return "[" + min + ",...]"; - else - return "[" + min + ",...," + max + "]"; + if (n.doubleValue() < min.doubleValue() || n.doubleValue() > max.doubleValue()) + throw new ConfigException(name, o, "Value must be in the range [" + min + ", " + max + "]"); } } @@ -271,75 +253,17 @@ public class ConfigDef { public final String documentation; public final Object defaultValue; public final Validator validator; - public final Importance importance; - public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) { + public ConfigKey(String name, Type type, Object defaultValue, Validator validator, String documentation) { super(); this.name = name; this.type = type; this.defaultValue = defaultValue; this.validator = validator; - this.importance = importance; if (this.validator != null) this.validator.ensureValid(name, defaultValue); this.documentation = documentation; } - public boolean hasDefault() { - return this.defaultValue != NO_DEFAULT_VALUE; - } - - } - - public String toHtmlTable() { - // sort first required fields, then by importance, then name - List configs = new ArrayList(this.configKeys.values()); - Collections.sort(configs, new Comparator() { - public int compare(ConfigDef.ConfigKey k1, ConfigDef.ConfigKey k2) { - // first take anything with no default value - if (!k1.hasDefault() && k2.hasDefault()) - return -1; - else if (!k2.hasDefault() && k1.hasDefault()) - return 1; - - // then sort by importance - int cmp = k1.importance.compareTo(k2.importance); - if (cmp == 0) - // then sort in alphabetical order - return k1.name.compareTo(k2.name); - else - return cmp; - } - }); - StringBuilder b = new StringBuilder(); - b.append("\n"); - b.append("\n"); - b.append("\n"); - b.append("\n"); - b.append("\n"); - b.append("\n"); - b.append("\n"); - b.append("\n"); - for (ConfigKey def : configs) { - b.append("\n"); - b.append(""); - b.append(""); - b.append(""); - b.append(""); - b.append(""); - b.append("\n"); - } - b.append("
    NameTypeDefaultImportanceDescription
    "); - b.append(def.name); - b.append(""); - b.append(def.type.toString().toLowerCase()); - b.append(""); - b.append(def.defaultValue == null ? "" : def.defaultValue); - b.append(""); - b.append(def.importance.toString().toLowerCase()); - b.append(""); - b.append(def.documentation); - b.append("
    "); - return b.toString(); } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java index 3c31201..3950eb1 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java @@ -42,7 +42,7 @@ public class JmxReporter implements MetricsReporter { private static final Logger log = LoggerFactory.getLogger(JmxReporter.class); private static final Object lock = new Object(); - private String prefix; + private final String prefix; private final Map mbeans = new HashMap(); public JmxReporter() { @@ -57,10 +57,6 @@ public class JmxReporter implements MetricsReporter { } @Override - public void configure(Map configs) { - } - - @Override public void init(List metrics) { synchronized (lock) { for (KafkaMetric metric : metrics) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java index 7acc19e..2c395b1 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java @@ -1,25 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.kafka.common.metrics; import java.util.List; -import org.apache.kafka.common.Configurable; - /** - * A plugin interface to allow things to listen as new metrics are created so they can be reported. + * A plugin interface to allow things to listen as new metrics are created so they can be reported */ -public interface MetricsReporter extends Configurable { +public interface MetricsReporter { /** * This is called when the reporter is first registered to initially register all existing metrics diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index 09a82fe..29543df 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -1,14 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.kafka.common.config; @@ -20,7 +24,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; -import org.apache.kafka.common.config.ConfigDef.Importance; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigDef.Range; import org.apache.kafka.common.config.ConfigDef.Type; import org.junit.Test; @@ -29,13 +35,13 @@ public class ConfigDefTest { @Test public void testBasicTypes() { - ConfigDef def = new ConfigDef().define("a", Type.INT, 5, Range.between(0, 14), Importance.HIGH, "docs") - .define("b", Type.LONG, Importance.HIGH, "docs") - .define("c", Type.STRING, "hello", Importance.HIGH, "docs") - .define("d", Type.LIST, Importance.HIGH, "docs") - .define("e", Type.DOUBLE, Importance.HIGH, "docs") - .define("f", Type.CLASS, Importance.HIGH, "docs") - .define("g", Type.BOOLEAN, Importance.HIGH, "docs"); + ConfigDef def = new ConfigDef().define("a", Type.INT, 5, Range.between(0, 14), "docs") + .define("b", Type.LONG, "docs") + .define("c", Type.STRING, "hello", "docs") + .define("d", Type.LIST, "docs") + .define("e", Type.DOUBLE, "docs") + .define("f", Type.CLASS, "docs") + .define("g", Type.BOOLEAN, "docs"); Properties props = new Properties(); props.put("a", "1 "); @@ -57,22 +63,22 @@ public class ConfigDefTest { @Test(expected = ConfigException.class) public void testInvalidDefault() { - new ConfigDef().define("a", Type.INT, "hello", Importance.HIGH, "docs"); + new ConfigDef().define("a", Type.INT, "hello", "docs"); } @Test(expected = ConfigException.class) public void testNullDefault() { - new ConfigDef().define("a", Type.INT, null, null, null, "docs"); + new ConfigDef().define("a", Type.INT, null, null, "docs"); } @Test(expected = ConfigException.class) public void testMissingRequired() { - new ConfigDef().define("a", Type.INT, Importance.HIGH, "docs").parse(new HashMap()); + new ConfigDef().define("a", Type.INT, "docs").parse(new HashMap()); } @Test(expected = ConfigException.class) public void testDefinedTwice() { - new ConfigDef().define("a", Type.STRING, Importance.HIGH, "docs").define("a", Type.INT, Importance.HIGH, "docs"); + new ConfigDef().define("a", Type.STRING, "docs").define("a", Type.INT, "docs"); } @Test @@ -88,7 +94,7 @@ public class ConfigDefTest { for (Object value : values) { Map m = new HashMap(); m.put("name", value); - ConfigDef def = new ConfigDef().define("name", type, Importance.HIGH, "docs"); + ConfigDef def = new ConfigDef().define("name", type, "docs"); try { def.parse(m); fail("Expected a config exception on bad input for value " + value); diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index 0513a59..649fcb6 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -32,9 +32,11 @@ object TopicMetadata { val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) val topic = readShortString(buffer) val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue)) - val partitionsMetadata = new ArrayBuffer[PartitionMetadata]() - for(i <- 0 until numPartitions) - partitionsMetadata += PartitionMetadata.readFrom(buffer, brokers) + val partitionsMetadata: Array[PartitionMetadata] = Array.fill(numPartitions){null} + for(i <- 0 until numPartitions) { + val partitionMetadata = PartitionMetadata.readFrom(buffer, brokers) + partitionsMetadata(partitionMetadata.partitionId) = partitionMetadata + } new TopicMetadata(topic, partitionsMetadata, errorCode) } } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index fcabd0d..d44b6af 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -36,7 +36,6 @@ import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} import java.util.concurrent.atomic.AtomicInteger -import org.apache.log4j.Logger import java.util.concurrent.locks.ReentrantLock import scala.Some import kafka.common.TopicAndPartition diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c068ef6..e5db5a3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -31,7 +31,10 @@ import kafka.utils.{Pool, SystemTime, Logging} import kafka.network.RequestChannel.Response import kafka.cluster.Broker import kafka.controller.KafkaController +import kafka.utils.Utils.inLock import org.I0Itec.zkclient.ZkClient +import java.util.concurrent.locks.ReentrantReadWriteLock +import kafka.controller.KafkaController.StateChangeLogger /** * Logic to handle the various Kafka requests @@ -51,12 +54,54 @@ class KafkaApis(val requestChannel: RequestChannel, private val delayedRequestMetrics = new DelayedRequestMetrics /* following 3 data structures are updated by the update metadata request * and is queried by the topic metadata request. */ - var metadataCache: mutable.Map[TopicAndPartition, PartitionStateInfo] = - new mutable.HashMap[TopicAndPartition, PartitionStateInfo]() + var metadataCache = new MetadataCache private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() - private val partitionMetadataLock = new Object + private val partitionMetadataLock = new ReentrantReadWriteLock() this.logIdent = "[KafkaApi-%d] ".format(brokerId) + class MetadataCache { + val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] = + new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]() + + def addPartitionInfo(topic: String, + partitionId: Int, + stateInfo: PartitionStateInfo) { + cache.get(topic) match { + case Some(infos) => infos.put(partitionId, stateInfo) + case None => { + val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo] + cache.put(topic, newInfos) + newInfos.put(partitionId, stateInfo) + } + } + } + + def containsTopicAndPartition(topic: String, + partitionId: Int): Boolean = { + cache.get(topic) match { + case Some(partitionInfos) => partitionInfos.contains(partitionId) + case None => false + } + } + + def allTopics = cache.keySet + + def removeTopic(topic: String) = cache.remove(topic) + + def containsTopic(topic: String) = cache.contains(topic) + + def updateCache(updateMetadataRequest: UpdateMetadataRequest, + brokerId: Int, + stateChangeLogger: StateChangeLogger) = { + updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) => + addPartitionInfo(tp.topic, tp.partition, info) + stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d").format(brokerId, info, tp, + updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + } + } + } + /** * Top-level method that handles all requests and multiplexes to the right api */ @@ -87,8 +132,10 @@ class KafkaApis(val requestChannel: RequestChannel, // ensureTopicExists is only for client facing requests private def ensureTopicExists(topic: String) = { - if(!metadataCache.exists { case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic.equals(topic)} ) - throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted") + inLock(partitionMetadataLock.readLock()) { + if (!metadataCache.containsTopic(topic)) + throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted") + } } def handleLeaderAndIsrRequest(request: RequestChannel.Request) { @@ -132,31 +179,25 @@ class KafkaApis(val requestChannel: RequestChannel, stateChangeLogger.warn(stateControllerEpochErrorMessage) throw new ControllerMovedException(stateControllerEpochErrorMessage) } - partitionMetadataLock synchronized { + inLock(partitionMetadataLock.writeLock()) { replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch // cache the list of alive brokers in the cluster updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) - updateMetadataRequest.partitionStateInfos.foreach { partitionState => - metadataCache.put(partitionState._1, partitionState._2) - stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1, - updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) - } + metadataCache.updateCache(updateMetadataRequest, brokerId, stateChangeLogger) // remove the topics that don't exist in the UpdateMetadata request since those are the topics that are // currently being deleted by the controller - val topicsKnownToThisBroker = metadataCache.map { - case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet + val topicsKnownToThisBroker = metadataCache.allTopics val topicsKnownToTheController = updateMetadataRequest.partitionStateInfos.map { case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet val deletedTopics = topicsKnownToThisBroker -- topicsKnownToTheController - val partitionsToBeDeleted = metadataCache.filter { - case(topicAndPartition, partitionStateInfo) => deletedTopics.contains(topicAndPartition.topic) - }.keySet - partitionsToBeDeleted.foreach { partition => - metadataCache.remove(partition) - stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d").format(brokerId, partition, - updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + deletedTopics.foreach { topic => + val partitionStateInfos = metadataCache.cache(topic) + partitionStateInfos.keySet.foreach { partition => + stateChangeLogger.trace(("Broker %d deleted partition %s for topic %s from metadata cache in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d").format(brokerId, partition, topic, + updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + } + metadataCache.removeTopic(topic) } } val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) @@ -610,62 +651,68 @@ class KafkaApis(val requestChannel: RequestChannel, private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = { val config = replicaManager.config - partitionMetadataLock synchronized { - topics.map { topic => - if(metadataCache.keySet.map(_.topic).contains(topic)) { - val partitionStateInfo = metadataCache.filter(p => p._1.topic.equals(topic)) - val sortedPartitions = partitionStateInfo.toList.sortWith((m1,m2) => m1._1.partition < m2._1.partition) - val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState) => - val replicas = metadataCache(topicAndPartition).allReplicas - val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq - var leaderInfo: Option[Broker] = None - var isrInfo: Seq[Broker] = Nil - val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch - val leader = leaderIsrAndEpoch.leaderAndIsr.leader - val isr = leaderIsrAndEpoch.leaderAndIsr.isr - debug("%s".format(topicAndPartition) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader) - try { - if(aliveBrokers.keySet.contains(leader)) - leaderInfo = Some(aliveBrokers(leader)) - else throw new LeaderNotAvailableException("Leader not available for partition %s".format(topicAndPartition)) - isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) - if(replicaInfo.size < replicas.size) - throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + - replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) - if(isrInfo.size < isr.size) - throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + - isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) - new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) - } catch { - case e: Throwable => - error("Error while fetching metadata for partition %s".format(topicAndPartition), e) - new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, - ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - } + // Returning all topics when requested topics are empty + val isAllTopics = topics.isEmpty + val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] + val topicsToBeCreated: mutable.ListBuffer[String] = new mutable.ListBuffer[String] + + inLock(partitionMetadataLock.readLock()) { + val topicsRequested = if (isAllTopics) metadataCache.allTopics else topics + for (topic <- topicsRequested) { + if (isAllTopics || metadataCache.containsTopic(topic)) { + val partitionStateInfos = metadataCache.cache(topic) + val partitionMetadata = partitionStateInfos.map { + case (partitionId, partitionState) => + val replicas = partitionState.allReplicas + val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq + var leaderInfo: Option[Broker] = None + var isrInfo: Seq[Broker] = Nil + val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch + val leader = leaderIsrAndEpoch.leaderAndIsr.leader + val isr = leaderIsrAndEpoch.leaderAndIsr.isr + debug("topic %s partition %s".format(topic, partitionId) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader) + try { + leaderInfo = aliveBrokers.get(leader) + if (!leaderInfo.isDefined) + throw new LeaderNotAvailableException("Leader not available for topic %s partition %s".format(topic, partitionId)) + isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) + if (replicaInfo.size < replicas.size) + throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + + replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) + if (isrInfo.size < isr.size) + throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + + isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) + new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) + } catch { + case e: Throwable => + error("Error while fetching metadata for topic %s partition %s".format(topic, partitionId), e) + new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + } } - new TopicMetadata(topic, partitionMetadata) + topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq) + } else if (config.autoCreateTopicsEnable || topic == OffsetManager.OffsetsTopicName) { + topicsToBeCreated += topic } else { - // topic doesn't exist, send appropriate error code after handling auto create topics - val isOffsetsTopic = topic == OffsetManager.OffsetsTopicName - if (config.autoCreateTopicsEnable || isOffsetsTopic) { - try { - if (isOffsetsTopic) - AdminUtils.createTopic(zkClient, topic, - config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, offsetManager.offsetsTopicConfig) - else - AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) - info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" - .format(topic, config.numPartitions, config.defaultReplicationFactor)) - } catch { - case e: TopicExistsException => // let it go, possibly another broker created this topic - } - new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode) - } else { - new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) - } + topicResponses += new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) } } - }.toSeq + } + + topicResponses.appendAll(topicsToBeCreated.map { topic => + try { + if (topic == OffsetManager.OffsetsTopicName) + AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, offsetManager.offsetsTopicConfig) + else + AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) + info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor)) + } catch { + case e: TopicExistsException => // let it go, possibly another broker created this topic + } + new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode) + }) + + topicResponses } /** @@ -673,17 +720,7 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] - var uniqueTopics = Set.empty[String] - uniqueTopics = { - if(metadataRequest.topics.size > 0) - metadataRequest.topics.toSet - else { - partitionMetadataLock synchronized { - metadataCache.keySet.map(_.topic) - } - } - } - val topicMetadata = getTopicMetadata(uniqueTopics) + val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet) trace("Sending topic metadata %s for correlation id %d to client %s".format(topicMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) val response = new TopicMetadataResponse(topicMetadata, metadataRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index 2dad20e..1d73aca 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -71,7 +71,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK def testCompression() { val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config))) + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) var producer = new KafkaProducer(props) diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index ef56044..525a060 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -67,11 +67,11 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness private def makeProducer(brokerList: String, acks: Int, metadataFetchTimeout: Long, blockOnBufferFull: Boolean, bufferSize: Long) : KafkaProducer = { val producerProps = new Properties() - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) + producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) + producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, acks.toString) producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString) producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString) - producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) + producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString) return new KafkaProducer(producerProps) } @@ -314,10 +314,10 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness var failed = false val producerProps = new Properties() - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - producerProps.put(ProducerConfig.ACKS_CONFIG, (-1).toString) - producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) - producerProps.put(ProducerConfig.RETRIES_CONFIG, 10.toString) + producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) + producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, (-1).toString) + producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString) + producerProps.put(ProducerConfig.MAX_RETRIES_CONFIG, 10.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString) val producer = new KafkaProducer(producerProps) diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 60e68c7..3c37330 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -91,7 +91,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testSendOffset() { val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) var producer = new KafkaProducer(props) val callback = new CheckErrorCallback @@ -149,7 +149,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testClose() { val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) var producer = new KafkaProducer(props) try { @@ -187,8 +187,8 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testSendToPartition() { val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) - props.put(ProducerConfig.ACKS_CONFIG, "-1") + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "-1") var producer = new KafkaProducer(props) try { @@ -245,7 +245,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testAutoCreateTopic() { val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) var producer = new KafkaProducer(props) try { diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index d5644ea..cb50e69 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -320,9 +320,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { try { // wait for the update metadata request to trickle to the brokers assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() => - activeServers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000)) + activeServers.foldLeft(true)(_ && _.apis.metadataCache.cache(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000)) assertEquals(0, partitionsRemaining.size) - var partitionStateInfo = activeServers.head.apis.metadataCache(TopicAndPartition(topic, partition)) + var partitionStateInfo = activeServers.head.apis.metadataCache.cache(topic)(partition) var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader assertEquals(0, leaderAfterShutdown) assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size) @@ -331,15 +331,15 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { partitionsRemaining = controller.shutdownBroker(1) assertEquals(0, partitionsRemaining.size) activeServers = servers.filter(s => s.config.brokerId == 0) - partitionStateInfo = activeServers.head.apis.metadataCache(TopicAndPartition(topic, partition)) + partitionStateInfo = activeServers.head.apis.metadataCache.cache(topic)(partition) leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader assertEquals(0, leaderAfterShutdown) - assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) + assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.cache(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) partitionsRemaining = controller.shutdownBroker(0) assertEquals(1, partitionsRemaining.size) // leader doesn't change since all the replicas are shut down - assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) + assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.cache(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) } finally { servers.foreach(_.shutdown()) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 22bb6f2..17b08e1 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -96,7 +96,7 @@ class SimpleFetchTest extends JUnit3Suite { val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) - apis.metadataCache.put(TopicAndPartition(topic, partitionId), partitionStateInfo) + apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo) EasyMock.replay(partitionStateInfo) // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log val goodFetch = new FetchRequestBuilder() @@ -169,7 +169,7 @@ class SimpleFetchTest extends JUnit3Suite { val requestChannel = new RequestChannel(2, 5) val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) - apis.metadataCache.put(TopicAndPartition(topic, partitionId), partitionStateInfo) + apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo) EasyMock.replay(partitionStateInfo) /** diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 2054c25..71ab6e1 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -531,7 +531,7 @@ object TestUtils extends Logging { def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long) = { assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition), TestUtils.waitUntilTrue(() => - servers.foldLeft(true)(_ && _.apis.metadataCache.keySet.contains(TopicAndPartition(topic, partition))), timeout)) + servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic, partition)), timeout)) } def writeNonsenseToFile(fileName: File, position: Long, size: Int) { diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index 1490bdb..9e4ebaf 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -204,16 +204,15 @@ object ProducerPerformance extends Logging { } class NewShinyProducer(config: ProducerPerfConfig) extends Producer { - import org.apache.kafka.clients.producer.ProducerConfig val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) - props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "perf-test") - props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) - props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) - props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) + props.put("metadata.broker.list", config.brokerList) + props.put("send.buffer.bytes", (64 * 1024).toString) + props.put("client.id", "perf-test") + props.put("request.required.acks", config.producerRequestRequiredAcks.toString) + props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) + props.put("request.retries", config.producerNumRetries.toString) + props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) + props.put("compression.type", config.compressionCodec.name) val producer = new KafkaProducer(props) def send(topic: String, partition: Long, bytes: Array[Byte]) { diff --git a/system_test/mirror_maker_testsuite/config/mirror_producer.properties b/system_test/mirror_maker_testsuite/config/mirror_producer.properties index 7f48b07..3ec69fa 100644 --- a/system_test/mirror_maker_testsuite/config/mirror_producer.properties +++ b/system_test/mirror_maker_testsuite/config/mirror_producer.properties @@ -1,5 +1,6 @@ block.on.buffer.full=true -bootstrap.servers=localhost:9094 +metadata.broker.list=localhost:9094 +compression.codec=0 compression.type=none -retries=3 -acks=1 +request.retries=3 +request.required.acks=1