diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 8c1c575..670d183 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -96,28 +96,30 @@ public class KafkaProducer implements Producer { private KafkaProducer(ProducerConfig config) { log.trace("Starting the Kafka producer"); Time time = new SystemTime(); - MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES)) - .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS), + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) + .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); String jmxPrefix = "kafka.producer." + (clientId.length() > 0 ? clientId + "." : ""); - List reporters = Collections.singletonList((MetricsReporter) new JmxReporter(jmxPrefix)); + List reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class); + reporters.add(new JmxReporter(jmxPrefix)); this.metrics = new Metrics(metricConfig, reporters, time); this.partitioner = new Partitioner(); this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); this.metadata = new Metadata(config.getLong(ProducerConfig.METADATA_FETCH_BACKOFF_CONFIG), - config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG)); + config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); - this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG); + this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); - this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG), + this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, config.getLong(ProducerConfig.LINGER_MS_CONFIG), config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, time); - List addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG)); + List addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); this.sender = new Sender(new Selector(this.metrics, time), this.metadata, @@ -125,9 +127,9 @@ public class KafkaProducer implements Producer { clientId, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), - (short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG), - config.getInt(ProducerConfig.MAX_RETRIES_CONFIG), - config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG), + (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), + config.getInt(ProducerConfig.RETRIES_CONFIG), + config.getInt(ProducerConfig.TIMEOUT_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), this.metrics, @@ -142,13 +144,21 @@ public class KafkaProducer implements Producer { log.debug("Kafka producer started"); } + private static int parseAcks(String acksString) { + try { + return acksString.trim().toLowerCase().equals("all") ? -1 : Integer.parseInt(acksString.trim()); + } catch (NumberFormatException e) { + throw new ConfigException("Invalid configuration value for 'acks': " + acksString); + } + } + private static List parseAndValidateAddresses(List urls) { List addresses = new ArrayList(); for (String url : urls) { if (url != null && url.length() > 0) { String[] pieces = url.split(":"); if (pieces.length != 2) - throw new ConfigException("Invalid url in " + ProducerConfig.BROKER_LIST_CONFIG + ": " + url); + throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); try { InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1])); if (address.isUnresolved()) @@ -263,15 +273,15 @@ public class KafkaProducer implements Producer { */ private void ensureValidRecordSize(int size) { if (size > this.maxRequestSize) - throw new RecordTooLargeException("The message is " + size - + " bytes when serialized which is larger than the maximum request size you have configured with the " - + ProducerConfig.MAX_REQUEST_SIZE_CONFIG - + " configuration."); + throw new RecordTooLargeException("The message is " + size + + " bytes when serialized which is larger than the maximum request size you have configured with the " + + ProducerConfig.MAX_REQUEST_SIZE_CONFIG + + " configuration."); if (size > this.totalMemorySize) - throw new RecordTooLargeException("The message is " + size - + " bytes when serialized which is larger than the total memory buffer you have configured with the " - + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG - + " configuration."); + throw new RecordTooLargeException("The message is " + size + + " bytes when serialized which is larger than the total memory buffer you have configured with the " + + ProducerConfig.BUFFER_MEMORY_CONFIG + + " configuration."); } public List partitionsFor(String topic) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 259c14b..779a1f0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -19,171 +19,207 @@ import java.util.Map; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; /** - * The producer configuration keys + * Configuration for the Kafka Producer. Documentation for these configurations can be found in the Kafka documentation */ public class ProducerConfig extends AbstractConfig { + /* + * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS THESE ARE PART OF THE PUBLIC API AND + * CHANGE WILL BREAK USER CODE. + */ + private static final ConfigDef config; - /** - * A list of URLs to use for establishing the initial connection to the cluster. This list should be in the form - * host1:port1,host2:port2,.... These urls are just used for the initial connection to discover the - * full cluster membership (which may change dynamically) so this list need not contain the full set of servers (you - * may want more than one, though, in case a server is down). - */ - public static final String BROKER_LIST_CONFIG = "metadata.broker.list"; + /** bootstrap.servers */ + public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; + private static final String BOOSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Data will be load " + "balanced over all servers irrespective of which servers are specified here for bootstrapping—this list only " + + "impacts the initial hosts used to discover the full set of servers. This list should be in the form " + + "host1:port1,host2:port2,.... Since these servers are just used for the initial connection to " + + "discover the full cluster membership (which may change dynamically), this list need not contain the full set of " + + "servers (you may want more than one, though, in case a server is down)."; - /** - * The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that - * topic. - */ + /** metadata.fetch.timeout.ms */ public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; + private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the " + "topic's partitions. This configuration controls the maximum amount of time we will block waiting for the metadata " + + "fetch to succeed before throwing an exception back to the client."; - /** - * The minimum amount of time between metadata fetches. This prevents polling for metadata too quickly. - */ + /** metadata.fetch.backoff.ms */ public static final String METADATA_FETCH_BACKOFF_CONFIG = "metadata.fetch.backoff.ms"; - - /** - * The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any - * leadership changes. - */ - public static final String METADATA_EXPIRY_CONFIG = "metadata.expiry.ms"; - - /** - * The buffer size allocated for a partition. When records are received which are smaller than this size the - * producer will attempt to optimistically group them together until this size is reached. - */ - public static final String MAX_PARTITION_SIZE_CONFIG = "max.partition.bytes"; - - /** - * The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent - * faster than they can be delivered to the server the producer will either block or throw an exception based on the - * preference specified by {@link #BLOCK_ON_BUFFER_FULL_CONFIG}. - */ - public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes"; - - /** - * The number of acknowledgments the producer requires from the server before considering a request complete. - */ - public static final String REQUIRED_ACKS_CONFIG = "request.required.acks"; - - /** - * The maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment - * requirements the producer has specified. If the requested number of acknowledgments are not met an error will be - * returned. - */ - public static final String REQUEST_TIMEOUT_CONFIG = "request.timeout.ms"; - - /** - * The producer groups together any records that arrive in between request sends. Normally this occurs only under - * load when records arrive faster than they can be sent out. However the client can reduce the number of requests - * and increase throughput by adding a small amount of artificial delay to force more records to batch together. - * This setting gives an upper bound on this delay. If we get {@link #MAX_PARTITION_SIZE_CONFIG} worth of records - * for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many - * bytes accumulated for this partition we will "linger" for the specified time waiting for more records to show up. - * This setting defaults to 0. - */ + private static final String METADATA_FETCH_BACKOFF_DOC = "The minimum amount of time between metadata refreshes. The client refreshes metadata whenever it realizes its " + "internal metadata is out of sync with the actual leadership of partitions. This configuration specifies a backoff " + + "to prevent metadata refreshes from happening too frequently."; + + /** metadata.max.age.ms */ + public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms"; + private static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any " + "leadership changes to proactively discover any new brokers or partitions."; + + /** batch.size */ + public static final String BATCH_SIZE_CONFIG = "batch.size"; + private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent" + " to the same partition. This helps performance on both the client and the server. This configuration controls the " + + "default batch size in bytes. " + + "

" + + "No attempt will be made to batch records larger than this size. " + + "

" + + "Requests sent to brokers will contain multiple batches, one for each partition there is data for. " + + "

" + + "A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable " + + "batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a " + + "buffer of the specified batch size in anticipation of additional messages."; + + /** buffer.memory */ + public static final String BUFFER_MEMORY_CONFIG = "buffer.memory"; + private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are " + "sent faster than they can be delivered to the server the producer will either block or throw an exception based " + + "on the preference specified by block.on.buffer.full. " + + "

" + + "This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since " + + "not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if " + + "compression is enabled) as well as for maintaining in-flight requests."; + + /** acks */ + public static final String ACKS_CONFIG = "acks"; + private static final String ACKS_DOC = "The number of acknowledgments the producer requires before considering a request complete. This controls the " + " durability of records that are sent. The following settings are commonly useful: " + + "

    " + + "
  • acks=0 If set to zero then the producer will not wait for any acknowledgment from the" + + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be" + + " made that the server has received the record in this case, and the retries configuration will not" + + " take effect (as the client won't generally know of any failures). The offset given back for each message will" + + " always be set to -1." + + "
  • acks=1 This will mean the leader will write the record to its local log but will respond" + + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after" + + " acknowledging the record but before the followers have replicated it then the record will be lost." + + "
  • acks=all This means the leader will wait for the full set of in-sync replicas to" + + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica" + + " remains alive. This is the strongest available guarantee." + + "
  • Other settings such as acks=2 are also possible, and will require the given number of" + + " acknowledgements but this is generally less useful."; + + /** timeout.ms */ + public static final String TIMEOUT_CONFIG = "timeout.ms"; + private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to " + "meet the acknowledgment requirements the producer has specified with the acks configuration. If the " + + "requested number of acknowledgments are not met when the timeout ellipses an error will be returned. This timeout " + + "is measured on the server side and does not include the network latency of the request."; + + /** linger.ms */ public static final String LINGER_MS_CONFIG = "linger.ms"; - - /** - * The id string to pass to the server when making requests. The purpose of this is to be able to track the source - * of requests beyond just ip/port by allowing a logical application name to be included. - */ + private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request sends. Normally this occurs only under " + "load when records arrive faster than they can be sent out. However in some circumstances the client may want to " + + "reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount " + + "of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to " + + "the given delay to allow other records to be sent so that the sends can be batched together. This can be thought " + + "of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once " + + "we get batch.size worth of records for a partition it will be sent immediately regardless of this " + + "setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the " + + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay)."; + + /** client.id */ public static final String CLIENT_ID_CONFIG = "client.id"; + private static final String CLIENT_ID_DOC = "The id string to pass to the server when making requests. The purpose of this is to be able to track the source " + "of requests beyond just ip/port by allowing a logical application name to be included with the request. The " + + "application can set any string it wants as this has no functional purpose other than in logging and metrics."; - /** - * The size of the TCP send buffer to use when sending data - */ + /** send.buffer.bytes */ public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes"; + private static final String SEND_BUFFER_DOC = "The size of the TCP send buffer to use when sending data"; - /** - * The size of the TCP receive buffer to use when reading data (you generally shouldn't need to change this) - */ + /** receive.buffer.bytes */ public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes"; + private static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer to use when reading data"; - /** - * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server - * has its own cap on record size which may be different from this. - */ + /** max.request.size */ public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size"; + private static final String MAX_REQUEST_SIZE_DOC = "The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server " + "has its own cap on record size which may be different from this. This setting will limit the number of record " + + "batches the producer will send in a single request to avoid sending huge requests."; - /** - * The amount of time to wait before attempting to reconnect to a given host. This avoids repeated connecting to a - * host in a tight loop. - */ + /** reconnect.backoff.ms */ public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms"; + private static final String RECONNECT_BACKOFF_MS_DOC = "The amount of time to wait before attempting to reconnect to a given host when a connection fails." + " This avoids a scenario where the client repeatedly attempts to connect to a host in a tight loop."; - /** - * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default - * this setting is true and we block, however users who want to guarantee we never block can turn this into an - * error. - */ + /** block.on.buffer.full */ public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full"; + private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default " + "this setting is true and we block, however in some scenarios blocking is not desirable and it is better to " + + "immediately give an error. Setting this to false will accomplish that."; - /** - * The maximum number of times to attempt resending the request before giving up. - */ - public static final String MAX_RETRIES_CONFIG = "request.retries"; + /** retries */ + public static final String RETRIES_CONFIG = "retries"; + private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error." + " Note that this retry is no different than if the client resent the message upon receiving the " + + "error. Allowing retries will potentially change the ordering of messages because if two messages are " + + "sent to a single partition, and the first fails and is retried but the second succeeds, then the second message " + + "may appear first."; - /** - * The amount of time to wait before attempting to resend produce request to a given topic partition. This avoids - * repeated sending-and-failing in a tight loop - */ + /** */ public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; + private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed produce request to a given topic partition." + " This avoids repeated sending-and-failing in a tight loop."; - /** - * The compression type for all data generated. The default is none (i.e. no compression) - */ + /** retry.backoff.ms */ public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; + private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are none, gzip, or snappy. Compression is of full batches of data, " + + " so the efficacy of batching will also impact the compression ratio (more batching means better compression)."; - /** - * The window size for a single metrics sample in ms. Defaults to 30 seconds. - */ - public static final String METRICS_SAMPLE_WINDOW_MS = "metrics.sample.window.ms"; + /** metrics.sample.window.ms */ + public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms"; + private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. " + + "When a window expires we erase and overwrite the oldest window."; - /** - * The number of samples used when reporting metrics. Defaults to two. So by default we use two 30 second windows, - * so metrics are computed over up to 60 seconds. - */ - public static final String METRICS_NUM_SAMPLES = "metrics.num.samples"; + /** metrics.num.samples */ + public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples"; + private static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics."; - /** - * Should we register the Kafka metrics as JMX mbeans? - */ - public static final String ENABLE_JMX_CONFIG = "enable.jmx"; + /** metric.reporters */ + public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; + private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows " + "plugging in classes that will be notified of new metric creation."; static { - /* TODO: add docs */ - config = new ConfigDef().define(BROKER_LIST_CONFIG, Type.LIST, "blah blah") - .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), "blah blah") - .define(METADATA_FETCH_BACKOFF_CONFIG, Type.LONG, 50, atLeast(0), "blah blah") - .define(METADATA_EXPIRY_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), "blah blah") - .define(MAX_PARTITION_SIZE_CONFIG, Type.INT, 16384, atLeast(0), "blah blah") - .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), "blah blah") - /* TODO: should be a string to handle acks=in-sync */ - .define(REQUIRED_ACKS_CONFIG, Type.INT, 1, between(-1, Short.MAX_VALUE), "blah blah") - .define(REQUEST_TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), "blah blah") - .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), "blah blah") - .define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah") - .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah") - .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), "blah blah") - .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah") - .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah") - .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah") - .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "") - .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah") - .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", "blah blah") - .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "") - .define(METRICS_SAMPLE_WINDOW_MS, Type.LONG, 30000, atLeast(0), "") - .define(METRICS_NUM_SAMPLES, Type.INT, 2, atLeast(1), ""); + config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC) + .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) + .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC) + .define(ACKS_CONFIG, Type.STRING, "1", Importance.HIGH, ACKS_DOC) + .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) + .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) + .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC) + .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC) + .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC) + .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, SEND_BUFFER_DOC) + .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, RECEIVE_BUFFER_DOC) + .define(MAX_REQUEST_SIZE_CONFIG, + Type.INT, + 1 * 1024 * 1024, + atLeast(0), + Importance.MEDIUM, + MAX_REQUEST_SIZE_DOC) + .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) + .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, RECONNECT_BACKOFF_MS_DOC) + .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC) + .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, RETRY_BACKOFF_MS_DOC) + .define(METADATA_FETCH_TIMEOUT_CONFIG, + Type.LONG, + 60 * 1000, + atLeast(0), + Importance.LOW, + METADATA_FETCH_TIMEOUT_DOC) + .define(METADATA_FETCH_BACKOFF_CONFIG, + Type.LONG, + 50, + atLeast(0), + Importance.LOW, + METADATA_FETCH_BACKOFF_DOC) + .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC) + .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, + Type.LONG, + 30000, + atLeast(0), + Importance.LOW, + METRICS_SAMPLE_WINDOW_MS_DOC) + .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC); } ProducerConfig(Map props) { super(config, props); } + public static void main(String[] args) { + System.out.println(config.toHtmlTable()); + } + } diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index 3b3fb2c..eb18739 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -36,12 +36,12 @@ public class ProducerPerformance { int recordSize = Integer.parseInt(args[3]); int acks = Integer.parseInt(args[4]); Properties props = new Properties(); - props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, Integer.toString(acks)); - props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url); + props.setProperty(ProducerConfig.ACKS_CONFIG, Integer.toString(acks)); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url); props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000)); - props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE)); - props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024 * 1024)); - props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(64 * 1024)); + props.setProperty(ProducerConfig.TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE)); + props.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024 * 1024)); + props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(64 * 1024)); if (args.length == 6) props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]); diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 84a327e..8d88610 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.common.config; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -111,6 +112,9 @@ public class AbstractConfig { log.info(b.toString()); } + /** + * Log warnings for any unused configurations + */ public void logUnused() { for (String key : unused()) log.warn("The configuration {} = {} was supplied but isn't a known config.", key, this.values.get(key)); @@ -136,4 +140,21 @@ public class AbstractConfig { return t.cast(o); } + public List getConfiguredInstances(String key, Class t) { + List klasses = getList(key); + List objects = new ArrayList(); + for (String klass : klasses) { + Class c = getClass(klass); + if (c == null) + return null; + Object o = Utils.newInstance(c); + if (!t.isInstance(o)) + throw new KafkaException(c.getName() + " is not an instance of " + t.getName()); + if (o instanceof Configurable) + ((Configurable) o).configure(this.originals); + objects.add(t.cast(o)); + } + return objects; + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 67b349d..9ba7ee7 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -1,22 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.config; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,7 +44,7 @@ import java.util.Map; */ public class ConfigDef { - private static final Object NO_DEFAULT_VALUE = new Object(); + private static final Object NO_DEFAULT_VALUE = new String(""); private final Map configKeys = new HashMap(); @@ -55,14 +54,15 @@ public class ConfigDef { * @param type The type of the config * @param defaultValue The default value to use if this config isn't present * @param validator A validator to use in checking the correctness of the config + * @param importance The importance of this config: is this something you will likely need to change. * @param documentation The documentation string for the config * @return This ConfigDef so you can chain calls */ - public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, String documentation) { + public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) { if (configKeys.containsKey(name)) throw new ConfigException("Configuration " + name + " is defined twice."); Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type); - configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, documentation)); + configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation)); return this; } @@ -71,11 +71,12 @@ public class ConfigDef { * @param name The name of the config parameter * @param type The type of the config * @param defaultValue The default value to use if this config isn't present + * @param importance The importance of this config: is this something you will likely need to change. * @param documentation The documentation string for the config * @return This ConfigDef so you can chain calls */ - public ConfigDef define(String name, Type type, Object defaultValue, String documentation) { - return define(name, type, defaultValue, null, documentation); + public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation) { + return define(name, type, defaultValue, null, importance, documentation); } /** @@ -83,22 +84,24 @@ public class ConfigDef { * @param name The name of the config parameter * @param type The type of the config * @param validator A validator to use in checking the correctness of the config + * @param importance The importance of this config: is this something you will likely need to change. * @param documentation The documentation string for the config * @return This ConfigDef so you can chain calls */ - public ConfigDef define(String name, Type type, Validator validator, String documentation) { - return define(name, type, NO_DEFAULT_VALUE, validator, documentation); + public ConfigDef define(String name, Type type, Validator validator, Importance importance, String documentation) { + return define(name, type, NO_DEFAULT_VALUE, validator, importance, documentation); } /** * Define a required parameter with no default value and no special validation logic * @param name The name of the config parameter * @param type The type of the config + * @param importance The importance of this config: is this something you will likely need to change. * @param documentation The documentation string for the config * @return This ConfigDef so you can chain calls */ - public ConfigDef define(String name, Type type, String documentation) { - return define(name, type, NO_DEFAULT_VALUE, null, documentation); + public ConfigDef define(String name, Type type, Importance importance, String documentation) { + return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation); } /** @@ -206,6 +209,10 @@ public class ConfigDef { BOOLEAN, STRING, INT, LONG, DOUBLE, LIST, CLASS; } + public enum Importance { + HIGH, MEDIUM, LOW + } + /** * Validation logic the user may provide */ @@ -230,7 +237,7 @@ public class ConfigDef { * @param min The minimum acceptable value */ public static Range atLeast(Number min) { - return new Range(min, Double.MAX_VALUE); + return new Range(min, null); } /** @@ -242,8 +249,19 @@ public class ConfigDef { public void ensureValid(String name, Object o) { Number n = (Number) o; - if (n.doubleValue() < min.doubleValue() || n.doubleValue() > max.doubleValue()) - throw new ConfigException(name, o, "Value must be in the range [" + min + ", " + max + "]"); + if (min != null && n.doubleValue() < min.doubleValue()) + throw new ConfigException(name, o, "Value must be at least " + min); + if (max != null && n.doubleValue() > max.doubleValue()) + throw new ConfigException(name, o, "Value must be no more than " + max); + } + + public String toString() { + if (min == null) + return "[...," + max + "]"; + else if (max == null) + return "[" + min + ",...]"; + else + return "[" + min + ",...," + max + "]"; } } @@ -253,17 +271,75 @@ public class ConfigDef { public final String documentation; public final Object defaultValue; public final Validator validator; + public final Importance importance; - public ConfigKey(String name, Type type, Object defaultValue, Validator validator, String documentation) { + public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) { super(); this.name = name; this.type = type; this.defaultValue = defaultValue; this.validator = validator; + this.importance = importance; if (this.validator != null) this.validator.ensureValid(name, defaultValue); this.documentation = documentation; } + public boolean hasDefault() { + return this.defaultValue != NO_DEFAULT_VALUE; + } + + } + + public String toHtmlTable() { + // sort first required fields, then by importance, then name + List configs = new ArrayList(this.configKeys.values()); + Collections.sort(configs, new Comparator() { + public int compare(ConfigDef.ConfigKey k1, ConfigDef.ConfigKey k2) { + // first take anything with no default value + if (!k1.hasDefault() && k2.hasDefault()) + return -1; + else if (!k2.hasDefault() && k1.hasDefault()) + return 1; + + // then sort by importance + int cmp = k1.importance.compareTo(k2.importance); + if (cmp == 0) + // then sort in alphabetical order + return k1.name.compareTo(k2.name); + else + return cmp; + } + }); + StringBuilder b = new StringBuilder(); + b.append("\n"); + b.append("\n"); + b.append("\n"); + b.append("\n"); + b.append("\n"); + b.append("\n"); + b.append("\n"); + b.append("\n"); + for (ConfigKey def : configs) { + b.append("\n"); + b.append(""); + b.append(""); + b.append(""); + b.append(""); + b.append(""); + b.append("\n"); + } + b.append("
    NameTypeDefaultImportanceDescription
    "); + b.append(def.name); + b.append(""); + b.append(def.type.toString().toLowerCase()); + b.append(""); + b.append(def.defaultValue == null ? "" : def.defaultValue); + b.append(""); + b.append(def.importance.toString().toLowerCase()); + b.append(""); + b.append(def.documentation); + b.append("
    "); + return b.toString(); } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java index 3950eb1..3c31201 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java @@ -42,7 +42,7 @@ public class JmxReporter implements MetricsReporter { private static final Logger log = LoggerFactory.getLogger(JmxReporter.class); private static final Object lock = new Object(); - private final String prefix; + private String prefix; private final Map mbeans = new HashMap(); public JmxReporter() { @@ -57,6 +57,10 @@ public class JmxReporter implements MetricsReporter { } @Override + public void configure(Map configs) { + } + + @Override public void init(List metrics) { synchronized (lock) { for (KafkaMetric metric : metrics) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java index 2c395b1..7acc19e 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java @@ -1,27 +1,25 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.metrics; import java.util.List; +import org.apache.kafka.common.Configurable; + /** - * A plugin interface to allow things to listen as new metrics are created so they can be reported + * A plugin interface to allow things to listen as new metrics are created so they can be reported. */ -public interface MetricsReporter { +public interface MetricsReporter extends Configurable { /** * This is called when the reporter is first registered to initially register all existing metrics diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index 29543df..09a82fe 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.config; @@ -24,9 +20,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; - -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Range; import org.apache.kafka.common.config.ConfigDef.Type; import org.junit.Test; @@ -35,13 +29,13 @@ public class ConfigDefTest { @Test public void testBasicTypes() { - ConfigDef def = new ConfigDef().define("a", Type.INT, 5, Range.between(0, 14), "docs") - .define("b", Type.LONG, "docs") - .define("c", Type.STRING, "hello", "docs") - .define("d", Type.LIST, "docs") - .define("e", Type.DOUBLE, "docs") - .define("f", Type.CLASS, "docs") - .define("g", Type.BOOLEAN, "docs"); + ConfigDef def = new ConfigDef().define("a", Type.INT, 5, Range.between(0, 14), Importance.HIGH, "docs") + .define("b", Type.LONG, Importance.HIGH, "docs") + .define("c", Type.STRING, "hello", Importance.HIGH, "docs") + .define("d", Type.LIST, Importance.HIGH, "docs") + .define("e", Type.DOUBLE, Importance.HIGH, "docs") + .define("f", Type.CLASS, Importance.HIGH, "docs") + .define("g", Type.BOOLEAN, Importance.HIGH, "docs"); Properties props = new Properties(); props.put("a", "1 "); @@ -63,22 +57,22 @@ public class ConfigDefTest { @Test(expected = ConfigException.class) public void testInvalidDefault() { - new ConfigDef().define("a", Type.INT, "hello", "docs"); + new ConfigDef().define("a", Type.INT, "hello", Importance.HIGH, "docs"); } @Test(expected = ConfigException.class) public void testNullDefault() { - new ConfigDef().define("a", Type.INT, null, null, "docs"); + new ConfigDef().define("a", Type.INT, null, null, null, "docs"); } @Test(expected = ConfigException.class) public void testMissingRequired() { - new ConfigDef().define("a", Type.INT, "docs").parse(new HashMap()); + new ConfigDef().define("a", Type.INT, Importance.HIGH, "docs").parse(new HashMap()); } @Test(expected = ConfigException.class) public void testDefinedTwice() { - new ConfigDef().define("a", Type.STRING, "docs").define("a", Type.INT, "docs"); + new ConfigDef().define("a", Type.STRING, Importance.HIGH, "docs").define("a", Type.INT, Importance.HIGH, "docs"); } @Test @@ -94,7 +88,7 @@ public class ConfigDefTest { for (Object value : values) { Map m = new HashMap(); m.put("name", value); - ConfigDef def = new ConfigDef().define("name", type, "docs"); + ConfigDef def = new ConfigDef().define("name", type, Importance.HIGH, "docs"); try { def.parse(m); fail("Expected a config exception on bad input for value " + value); diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index 1d73aca..2dad20e 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -71,7 +71,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK def testCompression() { val props = new Properties() - props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config))) + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) var producer = new KafkaProducer(props) diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 525a060..ef56044 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -67,11 +67,11 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness private def makeProducer(brokerList: String, acks: Int, metadataFetchTimeout: Long, blockOnBufferFull: Boolean, bufferSize: Long) : KafkaProducer = { val producerProps = new Properties() - producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) - producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, acks.toString) + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString) producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString) - producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString) + producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) return new KafkaProducer(producerProps) } @@ -314,10 +314,10 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness var failed = false val producerProps = new Properties() - producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) - producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, (-1).toString) - producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString) - producerProps.put(ProducerConfig.MAX_RETRIES_CONFIG, 10.toString) + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put(ProducerConfig.ACKS_CONFIG, (-1).toString) + producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) + producerProps.put(ProducerConfig.RETRIES_CONFIG, 10.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString) val producer = new KafkaProducer(producerProps) diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 3c37330..60e68c7 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -91,7 +91,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testSendOffset() { val props = new Properties() - props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) var producer = new KafkaProducer(props) val callback = new CheckErrorCallback @@ -149,7 +149,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testClose() { val props = new Properties() - props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) var producer = new KafkaProducer(props) try { @@ -187,8 +187,8 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testSendToPartition() { val props = new Properties() - props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) - props.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "-1") + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.ACKS_CONFIG, "-1") var producer = new KafkaProducer(props) try { @@ -245,7 +245,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testAutoCreateTopic() { val props = new Properties() - props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) var producer = new KafkaProducer(props) try {