diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index a6423f4..e6eb5b0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -95,30 +95,28 @@ public class KafkaProducer implements Producer { private KafkaProducer(ProducerConfig config) { log.trace("Starting the Kafka producer"); Time time = new SystemTime(); - MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) - .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES)) + .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS), TimeUnit.MILLISECONDS); String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); String jmxPrefix = "kafka.producer." + (clientId.length() > 0 ? clientId + "." : ""); - List reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); - reporters.add(new JmxReporter(jmxPrefix)); + List reporters = Collections.singletonList((MetricsReporter) new JmxReporter(jmxPrefix)); this.metrics = new Metrics(metricConfig, reporters, time); this.partitioner = new Partitioner(); - long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); - this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); + this.metadata = new Metadata(config.getLong(ProducerConfig.METADATA_FETCH_BACKOFF_CONFIG), + config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); - this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); + this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); - this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), + this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG), this.totalMemorySize, config.getLong(ProducerConfig.LINGER_MS_CONFIG), - retryBackoffMs, + config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, time); - List addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + List addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); this.sender = new Sender(new Selector(this.metrics, time), this.metadata, @@ -126,9 +124,9 @@ public class KafkaProducer implements Producer { clientId, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), - (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), - config.getInt(ProducerConfig.RETRIES_CONFIG), - config.getInt(ProducerConfig.TIMEOUT_CONFIG), + (short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG), + config.getInt(ProducerConfig.MAX_RETRIES_CONFIG), + config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), this.metrics, @@ -142,21 +140,13 @@ public class KafkaProducer implements Producer { log.debug("Kafka producer started"); } - private static int parseAcks(String acksString) { - try { - return acksString.trim().toLowerCase().equals("all") ? -1 : Integer.parseInt(acksString.trim()); - } catch (NumberFormatException e) { - throw new ConfigException("Invalid configuration value for 'acks': " + acksString); - } - } - private static List parseAndValidateAddresses(List urls) { List addresses = new ArrayList(); for (String url : urls) { if (url != null && url.length() > 0) { String[] pieces = url.split(":"); if (pieces.length != 2) - throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); + throw new ConfigException("Invalid url in " + ProducerConfig.BROKER_LIST_CONFIG + ": " + url); try { InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1])); if (address.isUnresolved()) @@ -278,7 +268,7 @@ public class KafkaProducer implements Producer { if (size > this.totalMemorySize) throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the total memory buffer you have configured with the " + - ProducerConfig.BUFFER_MEMORY_CONFIG + + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG + " configuration."); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index bc4074e..259c14b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -19,198 +19,171 @@ import java.util.Map; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; /** - * Configuration for the Kafka Producer. Documentation for these configurations can be found in the Kafka documentation + * The producer configuration keys */ public class ProducerConfig extends AbstractConfig { - /* - * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS THESE ARE PART OF THE PUBLIC API AND - * CHANGE WILL BREAK USER CODE. - */ - private static final ConfigDef config; - /** bootstrap.servers */ - public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; - private static final String BOOSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Data will be load " + "balanced over all servers irrespective of which servers are specified here for bootstrapping—this list only " - + "impacts the initial hosts used to discover the full set of servers. This list should be in the form " - + "host1:port1,host2:port2,.... Since these servers are just used for the initial connection to " - + "discover the full cluster membership (which may change dynamically), this list need not contain the full set of " - + "servers (you may want more than one, though, in case a server is down). If no server in this list is available sending " - + "data will fail until on becomes available."; + /** + * A list of URLs to use for establishing the initial connection to the cluster. This list should be in the form + * host1:port1,host2:port2,.... These urls are just used for the initial connection to discover the + * full cluster membership (which may change dynamically) so this list need not contain the full set of servers (you + * may want more than one, though, in case a server is down). + */ + public static final String BROKER_LIST_CONFIG = "metadata.broker.list"; - /** metadata.fetch.timeout.ms */ + /** + * The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that + * topic. + */ public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; - private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the " + "topic's partitions. This configuration controls the maximum amount of time we will block waiting for the metadata " - + "fetch to succeed before throwing an exception back to the client."; - - /** metadata.max.age.ms */ - public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms"; - private static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any " + " partition leadership changes to proactively discover any new brokers or partitions."; - - /** batch.size */ - public static final String BATCH_SIZE_CONFIG = "batch.size"; - private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent" + " to the same partition. This helps performance on both the client and the server. This configuration controls the " - + "default batch size in bytes. " - + "

" - + "No attempt will be made to batch records larger than this size. " - + "

" - + "Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. " - + "

" - + "A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable " - + "batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a " - + "buffer of the specified batch size in anticipation of additional records."; - - /** buffer.memory */ - public static final String BUFFER_MEMORY_CONFIG = "buffer.memory"; - private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are " + "sent faster than they can be delivered to the server the producer will either block or throw an exception based " - + "on the preference specified by block.on.buffer.full. " - + "

" - + "This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since " - + "not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if " - + "compression is enabled) as well as for maintaining in-flight requests."; - - /** acks */ - public static final String ACKS_CONFIG = "acks"; - private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the " + " durability of records that are sent. The following settings are common: " - + "

    " - + "
  • acks=0 If set to zero then the producer will not wait for any acknowledgment from the" - + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be" - + " made that the server has received the record in this case, and the retries configuration will not" - + " take effect (as the client won't generally know of any failures). The offset given back for each record will" - + " always be set to -1." - + "
  • acks=1 This will mean the leader will write the record to its local log but will respond" - + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after" - + " acknowledging the record but before the followers have replicated it then the record will be lost." - + "
  • acks=all This means the leader will wait for the full set of in-sync replicas to" - + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica" - + " remains alive. This is the strongest available guarantee." - + "
  • Other settings such as acks=2 are also possible, and will require the given number of" - + " acknowledgements but this is generally less useful."; - - /** timeout.ms */ - public static final String TIMEOUT_CONFIG = "timeout.ms"; - private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to " + "meet the acknowledgment requirements the producer has specified with the acks configuration. If the " - + "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout " - + "is measured on the server side and does not include the network latency of the request."; - - /** linger.ms */ + + /** + * The minimum amount of time between metadata fetches. This prevents polling for metadata too quickly. + */ + public static final String METADATA_FETCH_BACKOFF_CONFIG = "metadata.fetch.backoff.ms"; + + /** + * The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any + * leadership changes. + */ + public static final String METADATA_EXPIRY_CONFIG = "metadata.expiry.ms"; + + /** + * The buffer size allocated for a partition. When records are received which are smaller than this size the + * producer will attempt to optimistically group them together until this size is reached. + */ + public static final String MAX_PARTITION_SIZE_CONFIG = "max.partition.bytes"; + + /** + * The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent + * faster than they can be delivered to the server the producer will either block or throw an exception based on the + * preference specified by {@link #BLOCK_ON_BUFFER_FULL_CONFIG}. + */ + public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes"; + + /** + * The number of acknowledgments the producer requires from the server before considering a request complete. + */ + public static final String REQUIRED_ACKS_CONFIG = "request.required.acks"; + + /** + * The maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment + * requirements the producer has specified. If the requested number of acknowledgments are not met an error will be + * returned. + */ + public static final String REQUEST_TIMEOUT_CONFIG = "request.timeout.ms"; + + /** + * The producer groups together any records that arrive in between request sends. Normally this occurs only under + * load when records arrive faster than they can be sent out. However the client can reduce the number of requests + * and increase throughput by adding a small amount of artificial delay to force more records to batch together. + * This setting gives an upper bound on this delay. If we get {@link #MAX_PARTITION_SIZE_CONFIG} worth of records + * for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many + * bytes accumulated for this partition we will "linger" for the specified time waiting for more records to show up. + * This setting defaults to 0. + */ public static final String LINGER_MS_CONFIG = "linger.ms"; - private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. " + "Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to " - + "reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount " - + "of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to " - + "the given delay to allow other records to be sent so that the sends can be batched together. This can be thought " - + "of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once " - + "we get batch.size worth of records for a partition it will be sent immediately regardless of this " - + "setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the " - + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, " - + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load."; - - /** client.id */ + + /** + * The id string to pass to the server when making requests. The purpose of this is to be able to track the source + * of requests beyond just ip/port by allowing a logical application name to be included. + */ public static final String CLIENT_ID_CONFIG = "client.id"; - private static final String CLIENT_ID_DOC = "The id string to pass to the server when making requests. The purpose of this is to be able to track the source " + "of requests beyond just ip/port by allowing a logical application name to be included with the request. The " - + "application can set any string it wants as this has no functional purpose other than in logging and metrics."; - /** send.buffer.bytes */ + /** + * The size of the TCP send buffer to use when sending data + */ public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes"; - private static final String SEND_BUFFER_DOC = "The size of the TCP send buffer to use when sending data"; - /** receive.buffer.bytes */ + /** + * The size of the TCP receive buffer to use when reading data (you generally shouldn't need to change this) + */ public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes"; - private static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer to use when reading data"; - /** max.request.size */ + /** + * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server + * has its own cap on record size which may be different from this. + */ public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size"; - private static final String MAX_REQUEST_SIZE_DOC = "The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server " + "has its own cap on record size which may be different from this. This setting will limit the number of record " - + "batches the producer will send in a single request to avoid sending huge requests."; - /** reconnect.backoff.ms */ + /** + * The amount of time to wait before attempting to reconnect to a given host. This avoids repeated connecting to a + * host in a tight loop. + */ public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms"; - private static final String RECONNECT_BACKOFF_MS_DOC = "The amount of time to wait before attempting to reconnect to a given host when a connection fails." + " This avoids a scenario where the client repeatedly attempts to connect to a host in a tight loop."; - /** block.on.buffer.full */ + /** + * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default + * this setting is true and we block, however users who want to guarantee we never block can turn this into an + * error. + */ public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full"; - private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default " + "this setting is true and we block, however in some scenarios blocking is not desirable and it is better to " - + "immediately give an error. Setting this to false will accomplish that: the producer will throw a BufferExhaustedException if a recrord is sent and the buffer space is full."; - /** retries */ - public static final String RETRIES_CONFIG = "retries"; - private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error." + " Note that this retry is no different than if the client resent the record upon receiving the " - + "error. Allowing retries will potentially change the ordering of records because if two records are " - + "sent to a single partition, and the first fails and is retried but the second succeeds, then the second record " - + "may appear first."; + /** + * The maximum number of times to attempt resending the request before giving up. + */ + public static final String MAX_RETRIES_CONFIG = "request.retries"; - /** retry.backoff.ms */ + /** + * The amount of time to wait before attempting to resend produce request to a given topic partition. This avoids + * repeated sending-and-failing in a tight loop + */ public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; - private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed produce request to a given topic partition." + " This avoids repeated sending-and-failing in a tight loop."; - /** compression.type */ + /** + * The compression type for all data generated. The default is none (i.e. no compression) + */ public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; - private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are none, gzip, or snappy. Compression is of full batches of data, " - + " so the efficacy of batching will also impact the compression ratio (more batching means better compression)."; - /** metrics.sample.window.ms */ - public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms"; - private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. " - + "When a window expires we erase and overwrite the oldest window."; + /** + * The window size for a single metrics sample in ms. Defaults to 30 seconds. + */ + public static final String METRICS_SAMPLE_WINDOW_MS = "metrics.sample.window.ms"; - /** metrics.num.samples */ - public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples"; - private static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics."; + /** + * The number of samples used when reporting metrics. Defaults to two. So by default we use two 30 second windows, + * so metrics are computed over up to 60 seconds. + */ + public static final String METRICS_NUM_SAMPLES = "metrics.num.samples"; - /** metric.reporters */ - public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; - private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; + /** + * Should we register the Kafka metrics as JMX mbeans? + */ + public static final String ENABLE_JMX_CONFIG = "enable.jmx"; static { - config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC) - .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) - .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC) - .define(ACKS_CONFIG, Type.STRING, "1", Importance.HIGH, ACKS_DOC) - .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) - .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) - .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC) - .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC) - .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC) - .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, SEND_BUFFER_DOC) - .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, RECEIVE_BUFFER_DOC) - .define(MAX_REQUEST_SIZE_CONFIG, - Type.INT, - 1 * 1024 * 1024, - atLeast(0), - Importance.MEDIUM, - MAX_REQUEST_SIZE_DOC) - .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) - .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, RECONNECT_BACKOFF_MS_DOC) - .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC) - .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, RETRY_BACKOFF_MS_DOC) - .define(METADATA_FETCH_TIMEOUT_CONFIG, - Type.LONG, - 60 * 1000, - atLeast(0), - Importance.LOW, - METADATA_FETCH_TIMEOUT_DOC) - .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC) - .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, - Type.LONG, - 30000, - atLeast(0), - Importance.LOW, - METRICS_SAMPLE_WINDOW_MS_DOC) - .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC); + /* TODO: add docs */ + config = new ConfigDef().define(BROKER_LIST_CONFIG, Type.LIST, "blah blah") + .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), "blah blah") + .define(METADATA_FETCH_BACKOFF_CONFIG, Type.LONG, 50, atLeast(0), "blah blah") + .define(METADATA_EXPIRY_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), "blah blah") + .define(MAX_PARTITION_SIZE_CONFIG, Type.INT, 16384, atLeast(0), "blah blah") + .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), "blah blah") + /* TODO: should be a string to handle acks=in-sync */ + .define(REQUIRED_ACKS_CONFIG, Type.INT, 1, between(-1, Short.MAX_VALUE), "blah blah") + .define(REQUEST_TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), "blah blah") + .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), "blah blah") + .define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah") + .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah") + .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), "blah blah") + .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah") + .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah") + .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah") + .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "") + .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah") + .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", "blah blah") + .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "") + .define(METRICS_SAMPLE_WINDOW_MS, Type.LONG, 30000, atLeast(0), "") + .define(METRICS_NUM_SAMPLES, Type.INT, 2, atLeast(1), ""); } ProducerConfig(Map props) { super(config, props); } - public static void main(String[] args) { - System.out.println(config.toHtmlTable()); - } - } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 855ae84..2c200b9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Count; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.network.NetworkReceive; @@ -807,32 +808,97 @@ public class Sender implements Runnable { }); } + public void maybeRegisterNodeMetrics(int node) { + if (node >= 0) { + String nodeRequestName = "node-" + node + ".bytes-sent"; + Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); + if (nodeRequest == null) { + nodeRequest = this.metrics.sensor(nodeRequestName); + nodeRequest.add("node-" + node + ".outgoing-byte-rate", new Rate()); + nodeRequest.add("node-" + node + ".request-rate", "The average number of requests sent per second.", new Rate(new Count())); + nodeRequest.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg()); + nodeRequest.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max()); + } + + String nodeResponseName = "node-" + node + ".bytes-received"; + Sensor nodeResponse = this.metrics.getSensor(nodeResponseName); + if (nodeResponse == null) { + nodeResponse = this.metrics.sensor(nodeResponseName); + nodeResponse.add("node-" + node + ".incoming-byte-rate", new Rate()); + nodeResponse.add("node-" + node + ".response-rate", + "The average number of responses received per second.", + new Rate(new Count())); + } + + String nodeTimeName = "server." + node + ".latency"; + Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); + if (nodeRequestTime == null) { + nodeRequestTime = this.metrics.sensor(nodeTimeName); + nodeRequestTime.add("node-" + node + ".latency-avg", new Avg()); + nodeRequestTime.add("node-" + node + ".latency-max", new Max()); + } + } + + } + + public void maybeRegisterTopicMetrics(String topic) { + String topicRecordsCountName = "topic." + topic + ".records-per-batch"; + Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName); + if (topicRecordCount == null) { + topicRecordCount = this.metrics.sensor(topicRecordsCountName); + topicRecordCount.add("topic." + topic + ".record-send-rate", new Rate()); + } + + String topicByteRateName = "topic." + topic + ".bytes"; + Sensor topicByteRate = this.metrics.getSensor(topicByteRateName); + if (topicByteRate == null) { + topicByteRate = this.metrics.sensor(topicByteRateName); + topicByteRate.add("topic." + topic + ".byte-rate", new Rate()); + } + + String topicRetryName = "topic." + topic + ".record-retries"; + Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName); + if (topicRetrySensor == null) { + topicRetrySensor = this.metrics.sensor(topicRetryName); + topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate()); + } + + String topicErrorName = "topic." + topic + ".record-errors"; + Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName); + if (topicErrorSensor == null) { + topicErrorSensor = this.metrics.sensor(topicErrorName); + topicErrorSensor.add("topic." + topic + ".record-error-rate", new Rate()); + } + } + public void updateProduceRequestMetrics(List requests) { long ns = time.nanoseconds(); for (int i = 0; i < requests.size(); i++) { InFlightRequest request = requests.get(i); int records = 0; + + // register all per-broker metrics at once + int id = request.request.destination(); + maybeRegisterNodeMetrics(id); + if (request.batches != null) { for (RecordBatch batch : request.batches.values()) { - // per-topic record count - String topicRecordsCountName = "topic." + batch.topicPartition.topic() + ".records-per-batch"; - Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName); - if (topicRecordCount == null) { - topicRecordCount = this.metrics.sensor(topicRecordsCountName); - topicRecordCount.add("topic." + batch.topicPartition.topic() + ".record-send-rate", new Rate()); - } + // register all per-topic metrics at once + String topic = batch.topicPartition.topic(); + maybeRegisterTopicMetrics(topic); + + // per-topic record send rate + String topicRecordsCountName = "topic." + topic + ".records-per-batch"; + Sensor topicRecordCount = Utils.notNull(this.metrics.getSensor(topicRecordsCountName)); topicRecordCount.record(batch.recordCount); - // per-topic bytes-per-second - String topicByteRateName = "topic." + batch.topicPartition.topic() + ".bytes"; - Sensor topicByteRate = this.metrics.getSensor(topicByteRateName); - if (topicByteRate == null) { - topicByteRate = this.metrics.sensor(topicByteRateName); - topicByteRate.add("topic." + batch.topicPartition.topic() + ".byte-rate", new Rate()); - } + // per-topic bytes send rate + String topicByteRateName = "topic." + topic + ".bytes"; + Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName)); topicByteRate.record(batch.records.sizeInBytes()); + // global metrics this.batchSizeSensor.record(batch.records.sizeInBytes(), ns); this.queueTimeSensor.record(batch.drained - batch.created, ns); this.maxRecordSizeSensor.record(batch.maxRecordSize, ns); @@ -847,22 +913,14 @@ public class Sender implements Runnable { this.retrySensor.record(count); String topicRetryName = "topic." + topic + ".record-retries"; Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName); - if (topicRetrySensor == null) { - topicRetrySensor = this.metrics.sensor(topicRetryName); - topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate()); - } - topicRetrySensor.record(count); + if (topicRetrySensor != null) topicRetrySensor.record(count); } public void recordErrors(String topic, int count) { this.errorSensor.record(count); String topicErrorName = "topic." + topic + ".record-errors"; Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName); - if (topicErrorSensor == null) { - topicErrorSensor = this.metrics.sensor(topicErrorName); - topicErrorSensor.add("topic." + topic + ".record-error-rate", new Rate()); - } - topicErrorSensor.record(count); + if (topicErrorSensor != null) topicErrorSensor.record(count); } public void recordLatency(int node, long latency, long nowNs) { @@ -870,12 +928,7 @@ public class Sender implements Runnable { if (node >= 0) { String nodeTimeName = "server." + node + ".latency"; Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); - if (nodeRequestTime == null) { - nodeRequestTime = this.metrics.sensor(nodeTimeName); - nodeRequestTime.add("node-" + node + ".latency-avg", new Avg()); - nodeRequestTime.add("node-" + node + ".latency-max", new Max()); - } - nodeRequestTime.record(latency, nowNs); + if (nodeRequestTime != null) nodeRequestTime.record(latency, nowNs); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index eb18739..3b3fb2c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -36,12 +36,12 @@ public class ProducerPerformance { int recordSize = Integer.parseInt(args[3]); int acks = Integer.parseInt(args[4]); Properties props = new Properties(); - props.setProperty(ProducerConfig.ACKS_CONFIG, Integer.toString(acks)); - props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url); + props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, Integer.toString(acks)); + props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url); props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000)); - props.setProperty(ProducerConfig.TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE)); - props.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024 * 1024)); - props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(64 * 1024)); + props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE)); + props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024 * 1024)); + props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(64 * 1024)); if (args.length == 6) props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]); diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 8d88610..84a327e 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -12,7 +12,6 @@ */ package org.apache.kafka.common.config; -import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -112,9 +111,6 @@ public class AbstractConfig { log.info(b.toString()); } - /** - * Log warnings for any unused configurations - */ public void logUnused() { for (String key : unused()) log.warn("The configuration {} = {} was supplied but isn't a known config.", key, this.values.get(key)); @@ -140,21 +136,4 @@ public class AbstractConfig { return t.cast(o); } - public List getConfiguredInstances(String key, Class t) { - List klasses = getList(key); - List objects = new ArrayList(); - for (String klass : klasses) { - Class c = getClass(klass); - if (c == null) - return null; - Object o = Utils.newInstance(c); - if (!t.isInstance(o)) - throw new KafkaException(c.getName() + " is not an instance of " + t.getName()); - if (o instanceof Configurable) - ((Configurable) o).configure(this.originals); - objects.add(t.cast(o)); - } - return objects; - } - } diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 9ba7ee7..67b349d 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -1,21 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.kafka.common.config; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -44,7 +45,7 @@ import java.util.Map; */ public class ConfigDef { - private static final Object NO_DEFAULT_VALUE = new String(""); + private static final Object NO_DEFAULT_VALUE = new Object(); private final Map configKeys = new HashMap(); @@ -54,15 +55,14 @@ public class ConfigDef { * @param type The type of the config * @param defaultValue The default value to use if this config isn't present * @param validator A validator to use in checking the correctness of the config - * @param importance The importance of this config: is this something you will likely need to change. * @param documentation The documentation string for the config * @return This ConfigDef so you can chain calls */ - public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) { + public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, String documentation) { if (configKeys.containsKey(name)) throw new ConfigException("Configuration " + name + " is defined twice."); Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type); - configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation)); + configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, documentation)); return this; } @@ -71,12 +71,11 @@ public class ConfigDef { * @param name The name of the config parameter * @param type The type of the config * @param defaultValue The default value to use if this config isn't present - * @param importance The importance of this config: is this something you will likely need to change. * @param documentation The documentation string for the config * @return This ConfigDef so you can chain calls */ - public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation) { - return define(name, type, defaultValue, null, importance, documentation); + public ConfigDef define(String name, Type type, Object defaultValue, String documentation) { + return define(name, type, defaultValue, null, documentation); } /** @@ -84,24 +83,22 @@ public class ConfigDef { * @param name The name of the config parameter * @param type The type of the config * @param validator A validator to use in checking the correctness of the config - * @param importance The importance of this config: is this something you will likely need to change. * @param documentation The documentation string for the config * @return This ConfigDef so you can chain calls */ - public ConfigDef define(String name, Type type, Validator validator, Importance importance, String documentation) { - return define(name, type, NO_DEFAULT_VALUE, validator, importance, documentation); + public ConfigDef define(String name, Type type, Validator validator, String documentation) { + return define(name, type, NO_DEFAULT_VALUE, validator, documentation); } /** * Define a required parameter with no default value and no special validation logic * @param name The name of the config parameter * @param type The type of the config - * @param importance The importance of this config: is this something you will likely need to change. * @param documentation The documentation string for the config * @return This ConfigDef so you can chain calls */ - public ConfigDef define(String name, Type type, Importance importance, String documentation) { - return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation); + public ConfigDef define(String name, Type type, String documentation) { + return define(name, type, NO_DEFAULT_VALUE, null, documentation); } /** @@ -209,10 +206,6 @@ public class ConfigDef { BOOLEAN, STRING, INT, LONG, DOUBLE, LIST, CLASS; } - public enum Importance { - HIGH, MEDIUM, LOW - } - /** * Validation logic the user may provide */ @@ -237,7 +230,7 @@ public class ConfigDef { * @param min The minimum acceptable value */ public static Range atLeast(Number min) { - return new Range(min, null); + return new Range(min, Double.MAX_VALUE); } /** @@ -249,19 +242,8 @@ public class ConfigDef { public void ensureValid(String name, Object o) { Number n = (Number) o; - if (min != null && n.doubleValue() < min.doubleValue()) - throw new ConfigException(name, o, "Value must be at least " + min); - if (max != null && n.doubleValue() > max.doubleValue()) - throw new ConfigException(name, o, "Value must be no more than " + max); - } - - public String toString() { - if (min == null) - return "[...," + max + "]"; - else if (max == null) - return "[" + min + ",...]"; - else - return "[" + min + ",...," + max + "]"; + if (n.doubleValue() < min.doubleValue() || n.doubleValue() > max.doubleValue()) + throw new ConfigException(name, o, "Value must be in the range [" + min + ", " + max + "]"); } } @@ -271,75 +253,17 @@ public class ConfigDef { public final String documentation; public final Object defaultValue; public final Validator validator; - public final Importance importance; - public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) { + public ConfigKey(String name, Type type, Object defaultValue, Validator validator, String documentation) { super(); this.name = name; this.type = type; this.defaultValue = defaultValue; this.validator = validator; - this.importance = importance; if (this.validator != null) this.validator.ensureValid(name, defaultValue); this.documentation = documentation; } - public boolean hasDefault() { - return this.defaultValue != NO_DEFAULT_VALUE; - } - - } - - public String toHtmlTable() { - // sort first required fields, then by importance, then name - List configs = new ArrayList(this.configKeys.values()); - Collections.sort(configs, new Comparator() { - public int compare(ConfigDef.ConfigKey k1, ConfigDef.ConfigKey k2) { - // first take anything with no default value - if (!k1.hasDefault() && k2.hasDefault()) - return -1; - else if (!k2.hasDefault() && k1.hasDefault()) - return 1; - - // then sort by importance - int cmp = k1.importance.compareTo(k2.importance); - if (cmp == 0) - // then sort in alphabetical order - return k1.name.compareTo(k2.name); - else - return cmp; - } - }); - StringBuilder b = new StringBuilder(); - b.append("\n"); - b.append("\n"); - b.append("\n"); - b.append("\n"); - b.append("\n"); - b.append("\n"); - b.append("\n"); - b.append("\n"); - for (ConfigKey def : configs) { - b.append("\n"); - b.append(""); - b.append(""); - b.append(""); - b.append(""); - b.append(""); - b.append("\n"); - } - b.append("
    NameTypeDefaultImportanceDescription
    "); - b.append(def.name); - b.append(""); - b.append(def.type.toString().toLowerCase()); - b.append(""); - b.append(def.defaultValue == null ? "" : def.defaultValue); - b.append(""); - b.append(def.importance.toString().toLowerCase()); - b.append(""); - b.append(def.documentation); - b.append("
    "); - return b.toString(); } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java index 3c31201..3950eb1 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java @@ -42,7 +42,7 @@ public class JmxReporter implements MetricsReporter { private static final Logger log = LoggerFactory.getLogger(JmxReporter.class); private static final Object lock = new Object(); - private String prefix; + private final String prefix; private final Map mbeans = new HashMap(); public JmxReporter() { @@ -57,10 +57,6 @@ public class JmxReporter implements MetricsReporter { } @Override - public void configure(Map configs) { - } - - @Override public void init(List metrics) { synchronized (lock) { for (KafkaMetric metric : metrics) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java index 7acc19e..2c395b1 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java @@ -1,25 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.kafka.common.metrics; import java.util.List; -import org.apache.kafka.common.Configurable; - /** - * A plugin interface to allow things to listen as new metrics are created so they can be reported. + * A plugin interface to allow things to listen as new metrics are created so they can be reported */ -public interface MetricsReporter extends Configurable { +public interface MetricsReporter { /** * This is called when the reporter is first registered to initially register all existing metrics diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 558f8b4..8314a07 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -221,6 +221,7 @@ public class Selector implements Selectable { Transmissions transmissions = transmissions(key); SocketChannel channel = channel(key); + try { /* complete any connections that have finished their handshake */ if (key.isConnectable()) { @@ -438,32 +439,18 @@ public class Selector implements Selectable { public void recordBytesSent(int node, int bytes) { this.bytesSent.record(bytes); if (node >= 0) { - String name = "node-" + node + ".bytes-sent"; - Sensor sensor = this.metrics.getSensor(name); - if (sensor == null) { - sensor = this.metrics.sensor(name); - sensor.add("node-" + node + ".outgoing-byte-rate", new Rate()); - sensor.add("node-" + node + ".request-rate", "The average number of requests sent per second.", new Rate(new Count())); - sensor.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg()); - sensor.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max()); - } - sensor.record(bytes); + String nodeRequestName = "node-" + node + ".bytes-sent"; + Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); + if (nodeRequest != null) nodeRequest.record(bytes); } } public void recordBytesReceived(int node, int bytes) { this.bytesReceived.record(bytes); if (node >= 0) { - String name = "node-" + node + ".bytes-received"; - Sensor sensor = this.metrics.getSensor(name); - if (sensor == null) { - sensor = this.metrics.sensor(name); - sensor.add("node-" + node + ".incoming-byte-rate", new Rate()); - sensor.add("node-" + node + ".response-rate", - "The average number of responses received per second.", - new Rate(new Count())); - } - sensor.record(bytes); + String nodeRequestName = "node-" + node + ".bytes-received"; + Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); + if (nodeRequest != null) nodeRequest.record(bytes); } } } diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index 09a82fe..29543df 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -1,14 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.kafka.common.config; @@ -20,7 +24,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; -import org.apache.kafka.common.config.ConfigDef.Importance; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigDef.Range; import org.apache.kafka.common.config.ConfigDef.Type; import org.junit.Test; @@ -29,13 +35,13 @@ public class ConfigDefTest { @Test public void testBasicTypes() { - ConfigDef def = new ConfigDef().define("a", Type.INT, 5, Range.between(0, 14), Importance.HIGH, "docs") - .define("b", Type.LONG, Importance.HIGH, "docs") - .define("c", Type.STRING, "hello", Importance.HIGH, "docs") - .define("d", Type.LIST, Importance.HIGH, "docs") - .define("e", Type.DOUBLE, Importance.HIGH, "docs") - .define("f", Type.CLASS, Importance.HIGH, "docs") - .define("g", Type.BOOLEAN, Importance.HIGH, "docs"); + ConfigDef def = new ConfigDef().define("a", Type.INT, 5, Range.between(0, 14), "docs") + .define("b", Type.LONG, "docs") + .define("c", Type.STRING, "hello", "docs") + .define("d", Type.LIST, "docs") + .define("e", Type.DOUBLE, "docs") + .define("f", Type.CLASS, "docs") + .define("g", Type.BOOLEAN, "docs"); Properties props = new Properties(); props.put("a", "1 "); @@ -57,22 +63,22 @@ public class ConfigDefTest { @Test(expected = ConfigException.class) public void testInvalidDefault() { - new ConfigDef().define("a", Type.INT, "hello", Importance.HIGH, "docs"); + new ConfigDef().define("a", Type.INT, "hello", "docs"); } @Test(expected = ConfigException.class) public void testNullDefault() { - new ConfigDef().define("a", Type.INT, null, null, null, "docs"); + new ConfigDef().define("a", Type.INT, null, null, "docs"); } @Test(expected = ConfigException.class) public void testMissingRequired() { - new ConfigDef().define("a", Type.INT, Importance.HIGH, "docs").parse(new HashMap()); + new ConfigDef().define("a", Type.INT, "docs").parse(new HashMap()); } @Test(expected = ConfigException.class) public void testDefinedTwice() { - new ConfigDef().define("a", Type.STRING, Importance.HIGH, "docs").define("a", Type.INT, Importance.HIGH, "docs"); + new ConfigDef().define("a", Type.STRING, "docs").define("a", Type.INT, "docs"); } @Test @@ -88,7 +94,7 @@ public class ConfigDefTest { for (Object value : values) { Map m = new HashMap(); m.put("name", value); - ConfigDef def = new ConfigDef().define("name", type, Importance.HIGH, "docs"); + ConfigDef def = new ConfigDef().define("name", type, "docs"); try { def.parse(m); fail("Expected a config exception on bad input for value " + value); diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index 2dad20e..1d73aca 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -71,7 +71,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK def testCompression() { val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config))) + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) var producer = new KafkaProducer(props) diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index ef56044..525a060 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -67,11 +67,11 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness private def makeProducer(brokerList: String, acks: Int, metadataFetchTimeout: Long, blockOnBufferFull: Boolean, bufferSize: Long) : KafkaProducer = { val producerProps = new Properties() - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) + producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) + producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, acks.toString) producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString) producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString) - producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) + producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString) return new KafkaProducer(producerProps) } @@ -314,10 +314,10 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness var failed = false val producerProps = new Properties() - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - producerProps.put(ProducerConfig.ACKS_CONFIG, (-1).toString) - producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) - producerProps.put(ProducerConfig.RETRIES_CONFIG, 10.toString) + producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) + producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, (-1).toString) + producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString) + producerProps.put(ProducerConfig.MAX_RETRIES_CONFIG, 10.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString) val producer = new KafkaProducer(producerProps) diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 60e68c7..3c37330 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -91,7 +91,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testSendOffset() { val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) var producer = new KafkaProducer(props) val callback = new CheckErrorCallback @@ -149,7 +149,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testClose() { val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) var producer = new KafkaProducer(props) try { @@ -187,8 +187,8 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testSendToPartition() { val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) - props.put(ProducerConfig.ACKS_CONFIG, "-1") + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "-1") var producer = new KafkaProducer(props) try { @@ -245,7 +245,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testAutoCreateTopic() { val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) var producer = new KafkaProducer(props) try { diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index 1490bdb..9e4ebaf 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -204,16 +204,15 @@ object ProducerPerformance extends Logging { } class NewShinyProducer(config: ProducerPerfConfig) extends Producer { - import org.apache.kafka.clients.producer.ProducerConfig val props = new Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) - props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "perf-test") - props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) - props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) - props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) + props.put("metadata.broker.list", config.brokerList) + props.put("send.buffer.bytes", (64 * 1024).toString) + props.put("client.id", "perf-test") + props.put("request.required.acks", config.producerRequestRequiredAcks.toString) + props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) + props.put("request.retries", config.producerNumRetries.toString) + props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) + props.put("compression.type", config.compressionCodec.name) val producer = new KafkaProducer(props) def send(topic: String, partition: Long, bytes: Array[Byte]) { diff --git a/system_test/mirror_maker_testsuite/config/mirror_producer.properties b/system_test/mirror_maker_testsuite/config/mirror_producer.properties index 7f48b07..3ec69fa 100644 --- a/system_test/mirror_maker_testsuite/config/mirror_producer.properties +++ b/system_test/mirror_maker_testsuite/config/mirror_producer.properties @@ -1,5 +1,6 @@ block.on.buffer.full=true -bootstrap.servers=localhost:9094 +metadata.broker.list=localhost:9094 +compression.codec=0 compression.type=none -retries=3 -acks=1 +request.retries=3 +request.required.acks=1