diff --git a/clients/src/main/java/kafka/clients/consumer/Consumer.java b/clients/src/main/java/kafka/clients/consumer/Consumer.java new file mode 100644 index 0000000..5955c28 --- /dev/null +++ b/clients/src/main/java/kafka/clients/consumer/Consumer.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. +*/ +package kafka.clients.consumer; + +import java.io.Closeable; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.TopicPartition; + +/** + * @see KafkaConsumer + * @see MockConsumer + */ +public interface Consumer extends Closeable { + + /** + * Incrementally subscribe to the given list of topics. This API is mutually exclusive to + * {@link #subscribe(TopicPartition...) subscribe(partitions)} + * @param topics A variable list of topics that the consumer subscribes to + */ + public void subscribe(String...topics); + + /** + * Incrementally subscribes to a specific topic and partition. This API is mutually exclusive to + * {@link #subscribe(String...) subscribe(topics)} + * @param partitions Partitions to subscribe to + */ + public void subscribe(TopicPartition... partitions); + + /** + * Unsubscribe from the specific topics. Messages for this topic will not be returned from the next {@link #poll(long, TimeUnit) poll()} + * onwards. This should be used in conjunction with {@link #subscribe(String...) subscribe(topics)}. It is an error to + * unsubscribe from a topic that was never subscribed to using {@link #subscribe(String...) subscribe(topics)} + * @param topics Topics to unsubscribe from + */ + public void unsubscribe(String... topics); + + /** + * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next + * {@link #poll(long, TimeUnit) poll()} onwards. This should be used in conjunction with + * {@link #subscribe(TopicPartition...) subscribe(topic, partitions)}. It is an error to + * unsubscribe from a partition that was never subscribed to using {@link #subscribe(TopicPartition...) subscribe(partitions)} + * @param partitions Partitions to unsubscribe from + */ + public void unsubscribe(TopicPartition... partitions); + + /** + * Fetches data for the subscribed list of topics and partitions + * @param timeout The time spent waiting in poll if data is not available. If timeout is negative, + * it waits indefinitely for data to come in + * @param timeUnit Unit of the timeout + * @return List of records for the subscribed topics and partitions as soon as data is available for a topic partition. Availability + * of data is controlled by {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} and {@link ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG}. + * If no data is available for timeout ms, returns an empty list + */ + public List poll(long timeout, TimeUnit timeUnit); + + /** + * Synchronously commits offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the subscribed list of topics and partitions. + * @return A {@link java.util.concurrent.Future Future} for the {@link OffsetMetadata} that contains the partition, offset and a + * corresponding error code. Invoking {@link java.util.concurrent.Future#get() get()} on this future will result in the metadata + * for the commit offset request or throw any exception that occurred while committing the offsets for the subscribed partitions. + */ + public Future> commit(); + + /** + * Asynchronously commits the specified offsets for the specified list of topics and partitions to Kafka and returns immediately after + * sending the commit offset request. + * @param offsets The map of offsets to commit for the given topic partitions + * @return A {@link java.util.concurrent.Future Future} for the {@link OffsetMetadata} that contains the partition, offset and a + * corresponding error code. Invoking {@link java.util.concurrent.Future#get() get()} on this future will result in the metadata + * for the commit offset request or throw any exception that occurred while committing the offsets for the specified partitions. + */ + public Future> commit(Map offsets); + + /** + * Overrides the fetch positions that the consumer will use on the next fetch request. + * @param offsets The map of fetch positions per topic and partition + */ + public void seek(Map offsets); + + /** + * Returns the fetch position of the specified topic partition to be used on the next {@link #poll(long, TimeUnit) poll()} + * @param partition Partition for which the fetch position will be returned + * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long, TimeUnit) poll()} + */ + public long position(TopicPartition partition); + + /** + * Fetches the last committed offsets for the input list of partitions + * @param partitions The list of partitions to return the last committed offset for + * @return The list of offsets for the specified list of partitions + */ + public Map committed(TopicPartition...partitions); + + /** + * Fetches offsets before a certain timestamp + * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp. + * @param partitions The list of partitions for which the offsets are returned + * @return The offsets for messages that were written to the server before the specified timestamp. + */ + public Map offsetsBeforeTime(long timestamp, TopicPartition...partitions); + + /** + * Return a map of metrics maintained by the consumer + */ + public Map metrics(); + + /** + * Close this consumer + */ + public void close(); + +} diff --git a/clients/src/main/java/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/kafka/clients/consumer/ConsumerConfig.java new file mode 100644 index 0000000..42808d3 --- /dev/null +++ b/clients/src/main/java/kafka/clients/consumer/ConsumerConfig.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. +*/ +package kafka.clients.consumer; + +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; + +/** + * The consumer configuration keys + */ +public class ConsumerConfig extends AbstractConfig { + private static final ConfigDef config; + + /** + * The identifier of the group this consumer belongs to. This is required if the consumer uses either the + * group management functionality by using {@link Consumer#subscribe(String...) subscribe(topics)}. This is also required + * if the consumer uses the default Kafka based offset management strategy. + */ + public static final String GROUP_ID_CONFIG = "group.id"; + + /** + * The timeout after which, if the {@link Consumer#poll(long, TimeUnit) poll(timeout)} is not invoked, the consumer is + * marked dead and a rebalance operation is triggered for the group identified by {@link #GROUP_ID_CONFIG}. Relevant + * if the consumer uses the group management functionality by invoking {@link Consumer#subscribe(String...) subscribe(topics)} + */ + public static final String SESSION_TIMEOUT_MS = "session.timeout.ms"; + + /** + * A list of URLs to use for establishing the initial connection to the cluster. This list should be in the form + * host1:port1,host2:port2,.... These urls are just used for the initial connection to discover the + * full cluster membership (which may change dynamically) so this list need not contain the full set of servers (you + * may want more than one, though, in case a server is down). + */ + public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; + + /** + * If true, periodically commit to Kafka the offsets of messages already returned by the consumer. This committed + * offset will be used when the process fails as the position from which the consumption will begin. + */ + public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"; + + /** + * The frequency in milliseconds that the consumer offsets are committed to Kafka. Relevant if {@link #ENABLE_AUTO_COMMIT_CONFIG} + * is turned on. + */ + public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms"; + + /** + * What to do when there is no initial offset in Kafka or if an offset is out of range: + *
    + *
  • smallest: automatically reset the offset to the smallest offset + *
  • largest: automatically reset the offset to the largest offset + *
  • disable: throw exception to the consumer if no previous offset is found for the consumer's group + *
  • anything else: throw exception to the consumer. + *
+ */ + public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset"; + + /** + * The minimum amount of data the server should return for a fetch request. If insufficient data is available the + * request will wait for that much data to accumulate before answering the request. + */ + public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes"; + + /** + * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient + * data to immediately satisfy {@link #FETCH_MIN_BYTES_CONFIG}. This should be less than or equal to the timeout used in + * {@link KafkaConsumer#poll(long, TimeUnit) poll(timeout)} + */ + public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms"; + + /** + * The maximum amount of time to block waiting to fetch metadata about a topic the first time a record is received + * from that topic. The consumer will throw a TimeoutException if it could not successfully fetch metadata within + * this timeout. + */ + public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; + + /** + * The minimum amount of time between metadata fetches. This prevents polling for metadata too quickly. + */ + public static final String METADATA_FETCH_BACKOFF_CONFIG = "metadata.fetch.backoff.ms"; + + /** + * The total memory used by the consumer to buffer records received from the server. This config is meant to control + * the consumer's memory usage, so it is the size of the global fetch buffer that will be shared across all partitions. + */ + public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes"; + + /** + * The minimum amount of memory that should be used to fetch at least one message for a partition. This puts a lower + * bound on the consumer's memory utilization when there is at least one message for a partition available on the server. + * This size must be at least as large as the maximum message size the server allows or else it is possible for the producer + * to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large + * message on a certain partition. + */ + public static final String FETCH_BUFFER_CONFIG = "fetch.buffer.bytes"; + + /** + * The id string to pass to the server when making requests. The purpose of this is to be able to track the source + * of requests beyond just ip/port by allowing a logical application name to be included. + */ + public static final String CLIENT_ID_CONFIG = "client.id"; + + /** + * The size of the TCP send buffer to use when fetching data + */ + public static final String SOCKET_RECEIVE_BUFFER_CONFIG = "socket.receive.buffer.bytes"; + + /** + * The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a + * host in a tight loop. This backoff applies to all requests sent by the consumer to the broker. + */ + public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms"; + + /** metrics.sample.window.ms */ + public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms"; + private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. " + + "When a window expires we erase and overwrite the oldest window."; + + /** metrics.num.samples */ + public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples"; + private static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics."; + + /** metric.reporters */ + public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; + private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; + + static { + /* TODO: add config docs */ + config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, "blah blah") + .define(GROUP_ID_CONFIG, Type.STRING, Importance.HIGH, "blah blah") + .define(SESSION_TIMEOUT_MS, Type.LONG, 1000, Importance.HIGH, "blah blah") + .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), Importance.MEDIUM, "blah blah") + .define(METADATA_FETCH_BACKOFF_CONFIG, Type.LONG, 50, atLeast(0), Importance.MEDIUM, "blah blah") + .define(ENABLE_AUTO_COMMIT_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, "blah blah") + .define(AUTO_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, 5000, atLeast(0), Importance.LOW, "blah blah") + .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.LOW, "blah blah") + .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.LOW, "blah blah") + .define(FETCH_BUFFER_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), Importance.HIGH, "blah blah") + .define(SOCKET_RECEIVE_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.LOW, "blah blah") + .define(FETCH_MIN_BYTES_CONFIG, Type.LONG, 1024, atLeast(0), Importance.HIGH, "blah blah") + .define(FETCH_MAX_WAIT_MS_CONFIG, Type.LONG, 500, atLeast(0), Importance.LOW, "blah blah") + .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, "blah blah") + .define(AUTO_OFFSET_RESET_CONFIG, Type.STRING, "largest", Importance.MEDIUM, "blah blah") + .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, + Type.LONG, + 30000, + atLeast(0), + Importance.LOW, + METRICS_SAMPLE_WINDOW_MS_DOC) + .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC) + .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC); + + } + + ConsumerConfig(Map props) { + super(config, props); + } + +} diff --git a/clients/src/main/java/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/kafka/clients/consumer/ConsumerRebalanceCallback.java new file mode 100644 index 0000000..8c4e877 --- /dev/null +++ b/clients/src/main/java/kafka/clients/consumer/ConsumerRebalanceCallback.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. +*/ +package kafka.clients.consumer; + +import org.apache.kafka.common.TopicPartition; + +/** + * A callback interface that the user can implement to manage customized offsets on the start and end of + * every rebalance operation. This callback will execute in the user thread as part of the + * {@link Consumer#poll(long, TimeUnit) poll(long)} API on every rebalance attempt. + * Default implementation of the callback will {@link Consumer#seek(java.util.Map) seek(offsets)} to the last committed offsets in the + * {@link #onPartitionsAssigned(Consumer, TopicPartition...) onPartitionsAssigned()} callback. And will commit offsets synchronously + * for the specified list of partitions to Kafka in the {@link #onPartitionsRevoked(Consumer, TopicPartition...) onPartitionsRevoked()} + * callback. + */ +public interface ConsumerRebalanceCallback { + + /** + * A callback method the user can implement to provide handling of customized offsets on completion of a successful + * rebalance operation. This method will be called after a rebalance operation completes and before the consumer + * starts fetching data. + *

+ * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} + * @param partitions The list of partitions that are assigned to the consumer after rebalance + */ + public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions); + + /** + * A callback method the user can implement to provide handling of offset commits to a customized store on the + * start of a rebalance operation. This method will be called before a rebalance operation starts and after the + * consumer stops fetching data. It is recommended that offsets should be committed in this callback to + * either Kafka or a custom offset store to prevent duplicate data + *

+ * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} + * @param partitions The list of partitions that were assigned to the consumer on the last rebalance + */ + public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions); +} diff --git a/clients/src/main/java/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/kafka/clients/consumer/ConsumerRecord.java new file mode 100644 index 0000000..d54c089 --- /dev/null +++ b/clients/src/main/java/kafka/clients/consumer/ConsumerRecord.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. +*/ +package kafka.clients.consumer; + +import org.apache.kafka.common.TopicPartition; + +/** + * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the + * record is being received and an offset that points to the record in a Kafka partition. + *

+ */ +public final class ConsumerRecord { + private final TopicPartition partition; + private final byte[] key; + private final byte[] value; + private final long offset; + + /** + * Creates a record to be received from a specified topic and partition + * + * @param topic The topic this record is received from + * @param partition The partition of the topic this record is received from + * @param key The key of the record, if one exists + * @param value The record contents + * @param offset The offset of this record in the corresponding Kafka partition + */ + public ConsumerRecord(String topic, int partition, byte[] key, byte[] value, long offset) { + if (topic == null) + throw new IllegalArgumentException("Topic cannot be null"); + this.partition = new TopicPartition(topic, partition); + this.key = key; + this.value = value; + this.offset = offset; + } + + /** + * Create a record with no key + * + * @param topic The topic this record is received from + * @param partition The partition of the topic this record is received from + * @param value The record contents + * @param offset The offset of this record in the corresponding Kafka partition + */ + public ConsumerRecord(String topic, int partition, byte[] value, long offset) { + this(topic, partition, null, value, offset); + } + + /** + * The topic this record is received from + */ + public String topic() { + return partition.topic(); + } + + /** + * The key (or null if no key is specified) + */ + public byte[] key() { + return key; + } + + /** + * The value + */ + public byte[] value() { + return value; + } + + /** + * The partition from which this record is received + */ + public TopicPartition partition() { + return partition; + } + + /** + * The offset of this record in the corresponding Kafka partition. + */ + public long offset() { + return offset; + } + +} diff --git a/clients/src/main/java/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/kafka/clients/consumer/KafkaConsumer.java new file mode 100644 index 0000000..9d90487 --- /dev/null +++ b/clients/src/main/java/kafka/clients/consumer/KafkaConsumer.java @@ -0,0 +1,574 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. +*/ +package kafka.clients.consumer; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.utils.ClientUtils; +import org.apache.kafka.common.utils.SystemTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Kafka client that consumes records from a Kafka cluster. + *

+ * The consumer is thread safe and should generally be shared among all threads for best performance. + *

+ * The consumer is single threaded and multiplexes I/O over TCP connections to each of the brokers it + * needs to communicate with. Failure to close the consumer after use will leak these resources. + *

Usage Examples

+ * The consumer APIs offer flexibility to cover a variety of consumption use cases. Following are some examples to demonstrate the correct use of + * the available APIs. Each of the examples assumes the presence of a user implemented process() method that processes a given batch of messages + * and returns the offset of the latest processed message per partition. Note that process() is not part of the consumer API and is only used as + * a convenience method to demonstrate the different use cases of the consumer APIs. Here is a sample implementation of such a process() method. + *
+ * {@code
+ * Map process(List records) {
+ *   Map processedOffsets = new HashMap();
+ *   for(int i = 0;i < records.size(); i++) {
+ *     ConsumerRecord record = records.get(i);
+ *     // user specific logic to process record
+ *     processedOffsets.put(record.partition(), record.offset());
+ *   }
+ *   return processedOffsets; 
+ * }
+ * }
+ * 
+ *

+ * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load + * balancing and failover. This example assumes that the offsets are stored in Kafka and are automatically committed periodically, + * as controlled by the auto.commit.interval.ms config + *

+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("auto.commit.enable", "true");
+ * props.put("auto.commit.interval.ms", "10000");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * consumer.subscribe("foo", "bar");
+ * boolean isRunning = true;
+ * while(isRunning) {
+ *   List records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *   process(records);
+ * }
+ * consumer.close();
+ * }
+ * 
+ * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load + * balancing and failover. This example assumes that the offsets are stored in Kafka and are manually committed using + * either the commit() or commitAsync() APIs. This example also demonstrates rewinding the consumer's offsets if processing of the consumed + * messages fails. Note that this method of rewinding offsets using {@link #seek(Map) seek(offsets)} is only useful for rewinding the offsets + * of the current consumer instance. As such, this will not trigger a rebalance or affect the fetch offsets for the other consumer instances. + *
+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("auto.commit.enable", "false");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * consumer.subscribe("foo", "bar");
+ * int commitInterval = 100;
+ * int numRecords = 0;
+ * boolean isRunning = true;
+ * Map consumedOffsets = new HashMap();
+ * while(isRunning) {
+ *     List records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     try {
+ *         Map lastConsumedOffsets = process(records);
+ *         consumedOffsets.putAll(lastConsumedOffsets);
+ *         numRecords += records.size();
+ *         // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+ *         if(numRecords % commitInterval == 0) 
+ *           consumer.commit();
+ *     } catch(Exception e) {
+ *         try {
+ *             // rewind consumer's offsets for failed partitions
+ *             // assume failedPartitions() returns the list of partitions for which the processing of the last batch of messages failed
+ *             List failedPartitions = failedPartitions();   
+ *             Map offsetsToRewindTo = new HashMap();
+ *             for(TopicPartition failedPartition : failedPartitions) {
+ *                 // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset
+ *                 // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to.
+ *                 offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
+ *             }
+ *             // seek to new offsets only for partitions that failed the last process()
+ *             consumer.seek(offsetsToRewindTo);
+ *         } catch(Exception e) {  break; } // rewind failed
+ *     }
+ * }         
+ * consumer.close();
+ * }
+ * 
+ *

+ * This example demonstrates how to rewind the offsets of the entire consumer group. It is assumed that the user has chosen to use Kafka's + * group management functionality for automatic consumer load balancing and failover. This example also assumes that the offsets are stored in + * Kafka. If group management is used, the right place to systematically rewind offsets for every consumer instance is inside the + * ConsumerRebalanceCallback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance + * and before the consumption restarts post rebalance. This is the right place to supply the newly rewound offsets to the consumer. It + * is recommended that if you foresee the requirement to ever reset the consumer's offsets in the presence of group management, that you + * always configure the consumer to use the ConsumerRebalanceCallback with a flag that protects whether or not the offset rewind logic is used. + * This method of rewinding offsets is useful if you notice an issue with your message processing after successful consumption and offset commit. + * And you would like to rewind the offsets for the entire consumer group as part of rolling out a fix to your processing logic. In this case, + * you would configure each of your consumer instances with the offset rewind configuration flag turned on and bounce each consumer instance + * in a rolling restart fashion. Each restart will trigger a rebalance and eventually all consumer instances would have rewound the offsets for + * the partitions they own, effectively rewinding the offsets for the entire consumer group. + *

+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("auto.commit.enable", "false");
+ * KafkaConsumer consumer = new KafkaConsumer(props,
+ *                                            new ConsumerRebalanceCallback() {
+ *                                                boolean rewindOffsets = true;  // should be retrieved from external application config
+ *                                                public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) {
+ *                                                    Map latestCommittedOffsets = consumer.committed(partitions);
+ *                                                    if(rewindOffsets)
+ *                                                        Map newOffsets = rewindOffsets(latestCommittedOffsets, 100);
+ *                                                    consumer.seek(newOffsets);
+ *                                                }
+ *                                                public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) {
+ *                                                    consumer.commit();
+ *                                                }
+ *                                                // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages 
+ *                                                private Map rewindOffsets(Map currentOffsets,
+ *                                                                                                long numberOfMessagesToRewindBackTo) {
+ *                                                    Map newOffsets = new HashMap();
+ *                                                    for(Map.Entry offset : currentOffsets.entrySet()) 
+ *                                                        newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo);
+ *                                                    return newOffsets;
+ *                                                }
+ *                                            });
+ * consumer.subscribe("foo", "bar");
+ * int commitInterval = 100;
+ * int numRecords = 0;
+ * boolean isRunning = true;
+ * Map consumedOffsets = new HashMap();
+ * while(isRunning) {
+ *     List records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map lastConsumedOffsets = process(records);
+ *     consumedOffsets.putAll(lastConsumedOffsets);
+ *     numRecords += records.size();
+ *     // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+ *     if(numRecords % commitInterval == 0) 
+ *         consumer.commit(consumedOffsets);
+ * }
+ * consumer.close();
+ * }
+ * 
+ * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with custom offset storage. + * In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to + * plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback + * callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance and + * before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer. + *

+ * Similarly, the user would also be required to plugin logic for storing the consumer's offsets to a custom store. The onPartitionsRevoked + * callback is invoked right after the consumer has stopped fetching data and before the partition ownership changes. This is the right place + * to commit the offsets for the current set of partitions owned by the consumer. + *

+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("auto.commit.enable", "false"); // since auto.commit.enable only applies to Kafka based offset storage
+ * KafkaConsumer consumer = new KafkaConsumer(props,
+ *                                            new ConsumerRebalanceCallback() {
+ *                                                public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) {
+ *                                                    Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
+ *                                                    consumer.seek(lastCommittedOffsets);
+ *                                                }
+ *                                                public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) {
+ *                                                    Map offsets = getLastConsumedOffsets(partitions);
+ *                                                    commitOffsetsToCustomStore(offsets); 
+ *                                                }
+ *                                                // following APIs should be implemented by the user for custom offset management
+ *                                                private Map getLastCommittedOffsetsFromCustomStore(TopicPartition... partitions) {
+ *                                                    return null;
+ *                                                }
+ *                                                private Map getLastConsumedOffsets(TopicPartition... partitions) { return null; }
+ *                                                private void commitOffsetsToCustomStore(Map offsets) {}
+ *                                            });
+ * consumer.subscribe("foo", "bar");
+ * int commitInterval = 100;
+ * int numRecords = 0;
+ * boolean isRunning = true;
+ * Map consumedOffsets = new HashMap();
+ * while(isRunning) {
+ *     List records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map lastConsumedOffsets = process(records);
+ *     consumedOffsets.putAll(lastConsumedOffsets);
+ *     numRecords += records.size();
+ *     // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+ *     if(numRecords % commitInterval == 0) 
+ *         commitOffsetsToCustomStore(consumedOffsets);
+ * }
+ * consumer.close();
+ * }
+ * 
+ * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest + * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes + * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions. + * This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka + * based offset management. However, session.timeout.ms is not required since the Kafka consumer only does automatic failover when group + * management is used. + *
+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("auto.commit.enable", "true");
+ * props.put("auto.commit.interval.ms", "10000");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * // subscribe to some partitions of topic foo
+ * TopicPartition partition0 = new TopicPartition("foo", 0);
+ * TopicPartition partition1 = new TopicPartition("foo", 1);
+ * TopicPartition[] partitions = new TopicPartition[2];
+ * partitions[0] = partition0;
+ * partitions[1] = partition1;
+ * consumer.subscribe(partitions);
+ * // find the last committed offsets for partitions 0,1 of topic foo
+ * Map lastCommittedOffsets = consumer.committed(partition0, partition1);
+ * // seek to the last committed offsets to avoid duplicates
+ * consumer.seek(lastCommittedOffsets);        
+ * // find the offsets of the latest available messages to know where to stop consumption
+ * Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1);
+ * boolean isRunning = true;
+ * Map consumedOffsets = new HashMap();
+ * while(isRunning) {
+ *     List records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map lastConsumedOffsets = process(records);
+ *     consumedOffsets.putAll(lastConsumedOffsets);
+ *     for(TopicPartition partition : partitions) {
+ *         if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
+ *             isRunning = false;
+ *         else
+ *             isRunning = true;
+ *     }
+ * }
+ * consumer.close();
+ * }
+ * 
+ * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest + * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes + * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions. + * This example assumes that the user chooses to use custom offset storage. + *
+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * // subscribe to some partitions of topic foo
+ * TopicPartition partition0 = new TopicPartition("foo", 0);
+ * TopicPartition partition1 = new TopicPartition("foo", 1);
+ * TopicPartition[] partitions = new TopicPartition[2];
+ * partitions[0] = partition0;
+ * partitions[1] = partition1;
+ * consumer.subscribe(partitions);
+ * Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
+ * // seek to the last committed offsets to avoid duplicates
+ * consumer.seek(lastCommittedOffsets);        
+ * // find the offsets of the latest available messages to know where to stop consumption
+ * Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1);
+ * boolean isRunning = true;
+ * Map consumedOffsets = new HashMap();
+ * while(isRunning) {
+ *     List records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map lastConsumedOffsets = process(records);
+ *     consumedOffsets.putAll(lastConsumedOffsets);
+ *     // commit offsets for partitions 0,1 for topic foo to custom store
+ *     commitOffsetsToCustomStore(consumedOffsets);
+ *     for(TopicPartition partition : partitions) {
+ *         if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
+ *             isRunning = false;
+ *         else
+ *             isRunning = true;
+ *     }            
+ * }         
+ * consumer.close();
+ * }
+ * 
+ */ +public class KafkaConsumer implements Consumer { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class); + + private final long metadataFetchTimeoutMs; + private final long totalMemorySize; + private final Metrics metrics; + private final Set subscribedTopics; + private final Set subscribedPartitions; + + /** + * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings + * are documented here. Values can be + * either strings or Objects of the appropriate type (for example a numeric configuration would accept either the + * string "42" or the integer 42). + *

+ * Valid configuration strings are documented at {@link ConsumerConfig} + * @param configs The consumer configs + */ + public KafkaConsumer(Map configs) { + this(new ConsumerConfig(configs), null); + } + + /** + * A consumer is instantiated by providing a set of key-value pairs as configuration and a {@link ConsumerRebalanceCallback} + * implementation + *

+ * Valid configuration strings are documented at {@link ConsumerConfig} + * @param configs The consumer configs + * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of + * every rebalance operation. + */ + public KafkaConsumer(Map configs, ConsumerRebalanceCallback callback) { + this(new ConsumerConfig(configs), callback); + } + + /** + * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration. + * Valid configuration strings are documented at {@link ConsumerConfig} + */ + public KafkaConsumer(Properties properties) { + this(new ConsumerConfig(properties), null); + } + + /** + * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a + * {@link ConsumerRebalanceCallback} implementation. + *

+ * Valid configuration strings are documented at {@link ConsumerConfig} + * @param properties The consumer configuration properties + * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of + * every rebalance operation. + */ + public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) { + this(new ConsumerConfig(properties), callback); + } + + private KafkaConsumer(ConsumerConfig config) { + this(config, null); + } + + private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback) { + log.trace("Starting the Kafka consumer"); + subscribedTopics = new HashSet(); + subscribedPartitions = new HashSet(); + this.metrics = new Metrics(new MetricConfig(), + Collections.singletonList((MetricsReporter) new JmxReporter("kafka.consumer.")), + new SystemTime()); + this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG); + this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG); + List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + config.logUnused(); + log.debug("Kafka consumer started"); + } + + /** + * Incrementally subscribes to the given list of topics and uses the consumer's group management functionality + *

+ * As part of group management, the consumer will keep track of the list of consumers that belong to a particular group and + * will trigger a rebalance operation if one of the following events trigger - + *

    + *
  • Number of partitions change for any of the subscribed list of topics + *
  • Topic is created or deleted + *
  • An existing member of the consumer group dies + *
  • A new member is added to an existing consumer group via the join API + *
+ * @param topics A variable list of topics that the consumer wants to subscribe to + */ + @Override + public void subscribe(String... topics) { + if(subscribedPartitions.size() > 0) + throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); + for(String topic:topics) + subscribedTopics.add(topic); + // TODO: trigger a rebalance operation + } + + /** + * Incrementally subscribes to a specific topic partition and does not use the consumer's group management functionality. As such, + * there will be no rebalance operation triggered when group membership or cluster and topic metadata change. + *

+ * @param partitions Partitions to incrementally subscribe to + */ + @Override + public void subscribe(TopicPartition... partitions) { + if(subscribedTopics.size() > 0) + throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); + for(TopicPartition partition:partitions) + subscribedPartitions.add(partition); + } + + /** + * Unsubscribe from the specific topics. This will trigger a rebalance operation and messages for this topic will not be returned + * from the next {@link #poll(long, TimeUnit) poll()} onwards + * @param topics Topics to unsubscribe from + */ + public void unsubscribe(String... topics) { + // throw an exception if the topic was never subscribed to + for(String topic:topics) { + if(!subscribedTopics.contains(topic)) + throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" + + " to unsubscribe(" + topic + ")"); + subscribedTopics.remove(topic); + } + // TODO trigger a rebalance operation + } + + /** + * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next + * {@link #poll(long, TimeUnit) poll()} onwards + * @param partitions Partitions to unsubscribe from + */ + public void unsubscribe(TopicPartition... partitions) { + // throw an exception if the partition was never subscribed to + for(TopicPartition partition:partitions) { + if(!subscribedPartitions.contains(partition)) + throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" + + partition.topic() + "," + partition.partition() + ") should be called prior" + + " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")"); + subscribedPartitions.remove(partition); + } + // trigger a rebalance operation + } + + /** + * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have subscribed to + * any topics or partitions before polling for data. + *

+ * The offset used for fetching the data is governed by whether or not {@link #seek(Map) seek(offsets)} + * is used. If {@link #seek(Map) seek(offsets)} is used, it will use the specified offsets on startup and + * on every rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed offset + * using {@link #commit(Map) commit(offsets)} or {@link #commitAsync(Map) commitAsync(offsets)} + * for the subscribed list of partitions. + * @param timeout The time spent waiting in poll if data is not available. If timeout is negative, + * it waits indefinitely for data to come in + * @param timeUnit Unit of the timeout + * @return list of records since the last fetch for the subscribed list of topics and partitions + */ + @Override + public List poll(long timeout, TimeUnit timeUnit) { + // TODO Auto-generated method stub + return null; + } + + /** + * Synchronously commits the specified offsets for the specified list of topics and partitions to Kafka. + *

+ * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance + * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. + * @param offsets The list of offsets per partition that should be committed to Kafka. + * @return A {@link java.util.concurrent.Future Future} for the {@link OffsetMetadata} that contains the partition, offset and a + * corresponding error code. Invoking {@link java.util.concurrent.Future#get() get()} on this future will result in the metadata + * for the commit offset request or throw any exception that occurred while committing the offsets for the specified partitions. + * For example, if {@link #subscribe(String...) subscribe(topics} is used, an attempt to commit offsets for partitions that the + * consumer does not own, fails. + */ + @Override + public Future> commit(Map offsets) { + throw new UnsupportedOperationException(); + } + + /** + * Synchronously commits offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the subscribed list of topics and + * partitions. + *

+ * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance + * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. + * @return A {@link java.util.concurrent.Future Future} for the {@link OffsetMetadata} that contains the partition, offset and a + * corresponding error code. Invoking {@link java.util.concurrent.Future#get() get()} on this future will result in the metadata + * for the commit offset request or throw any exception that occurred while committing the offsets for the specified partitions. + */ + @Override + public Future> commit() { + throw new UnsupportedOperationException(); + } + + /** + * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long, TimeUnit) poll(timeout)}. If this API is invoked + * for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is + * arbitrarily used in the middle of consumption, to reset the fetch offsets + */ + @Override + public void seek(Map offsets) { + } + + /** + * Returns the fetch position of the specified topic partition to be used on the next {@link #poll(long, TimeUnit) poll()} + * @param partition Partition for which the fetch position will be returned + * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long, TimeUnit) poll()} + */ + public long position(TopicPartition partition) { + return 0L; + } + + /** + * Fetches the last committed offsets of partitions that the consumer currently consumes. This API is only relevant if Kafka based offset + * storage is used. This API can be used in conjunction with {@link #seek(Map) seek(offsets)} to rewind consumption of data. + * @param partitions The list of partitions for which offsets are returned. If no partitions are specified, the offsets for partitions that + * are assigned to this consumer are returned + * @return The list of offsets committed on the last {@link #commit() commit()} or + * {@link #commitAsync() commitAsync()} + */ + @Override + public Map committed(TopicPartition... partitions) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException(); + } + + /** + * Fetches offsets before a certain timestamp. Note that the offsets returned are approximately computed and do not correspond to the exact + * message at the given timestamp. As such, if the consumer is rewound to offsets returned by this API, there may be duplicate messages + * returned by the consumer. + * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp. + * @param partitions The list of partitions for which the offsets are returned + * @return The offsets per partition before the specified timestamp. + */ + public Map offsetsBeforeTime(long timestamp, TopicPartition...partitions) { + return null; + } + + @Override + public Map metrics() { + return Collections.unmodifiableMap(this.metrics.metrics()); + } + + @Override + public void close() { + log.trace("Closing the Kafka consumer."); + subscribedTopics.clear(); + subscribedPartitions.clear(); + this.metrics.close(); + log.debug("The Kafka consumer has closed."); + } +} diff --git a/clients/src/main/java/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/kafka/clients/consumer/MockConsumer.java new file mode 100644 index 0000000..aa7b5d1 --- /dev/null +++ b/clients/src/main/java/kafka/clients/consumer/MockConsumer.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. +*/ +package kafka.clients.consumer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.internals.FutureOffsetMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.TopicPartition; + +/** + * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. + * This class is not threadsafe + *

+ * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it + * needs to communicate with. Failure to close the consumer after use will leak these resources. + */ +public class MockConsumer implements Consumer { + + private final Set subscribedPartitions; + private final Map committedOffsets; + private final Map consumedOffsets; + + public MockConsumer() { + subscribedPartitions = new HashSet(); + committedOffsets = new HashMap(); + consumedOffsets = new HashMap(); + } + + @Override + public void subscribe(String... topics) { + for(String topic : topics) { + subscribedPartitions.add(new TopicPartition(topic, 0)); + } + } + + @Override + public void subscribe(TopicPartition... partitions) { + for(TopicPartition partition : partitions) { + subscribedPartitions.add(partition); + consumedOffsets.put(partition, 0L); + } + } + + public void unsubscribe(String... topics) { + for(String topic : topics) { + unsubscribe(topic); + } + } + + public void unsubscribe(TopicPartition... partitions) { + for(TopicPartition partition : partitions) { + subscribedPartitions.remove(partition); + committedOffsets.remove(partition); + consumedOffsets.remove(partition); + } + } + + @Override + public List poll(long timeout, TimeUnit timeUnit) { + // hand out one dummy record, 1 per topic + List records = new ArrayList(); + for(TopicPartition partition : subscribedPartitions) { + // get the last consumed offset + long messageSequence = consumedOffsets.get(partition); + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + ObjectOutputStream outputStream; + try { + outputStream = new ObjectOutputStream(byteStream); + outputStream.writeLong(messageSequence++); + outputStream.close(); + } catch (IOException e) { + e.printStackTrace(); + } + records.add(new ConsumerRecord(partition.topic(), partition.partition(), null, byteStream.toByteArray(), messageSequence)); + consumedOffsets.put(partition, messageSequence); + } + return records; + } + + @Override + public Future> commit(Map offsets) { + Map offsetMetadata = new HashMap(offsets.size()); + for(Entry partitionOffset : offsets.entrySet()) { + committedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue()); + offsetMetadata.put(partitionOffset.getKey(), + new OffsetMetadata(partitionOffset.getKey(), partitionOffset.getValue(), (short)0)); + } + return new FutureOffsetMetadata(offsetMetadata); + } + + @Override + public void seek(Map offsets) { + // change the fetch offsets + for(Entry partitionOffset : offsets.entrySet()) { + consumedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue()); + } + } + + @Override + public Map committed(TopicPartition... partitions) { + Map offsets = new HashMap(); + for(TopicPartition partition : partitions) { + offsets.put(new TopicPartition(partition.topic(), partition.partition()), committedOffsets.get(partition)); + } + return offsets; + } + + @Override + public long position(TopicPartition partition) { + return consumedOffsets.get(partition); + } + + @Override + public Map offsetsBeforeTime(long timestamp, + TopicPartition... partitions) { + throw new UnsupportedOperationException(); + } + + @Override + public Future> commit() { + return commit(consumedOffsets); + } + + @Override + public Map metrics() { + return null; + } + + @Override + public void close() { + // unsubscribe from all partitions + TopicPartition[] allPartitions = new TopicPartition[subscribedPartitions.size()]; + unsubscribe(subscribedPartitions.toArray(allPartitions)); + } +} diff --git a/clients/src/main/java/kafka/common/TopicPartitionOffset.java b/clients/src/main/java/kafka/common/TopicPartitionOffset.java new file mode 100644 index 0000000..237ed15 --- /dev/null +++ b/clients/src/main/java/kafka/common/TopicPartitionOffset.java @@ -0,0 +1,70 @@ +package kafka.common; + +/** + * A topic name, partition number and offset + */ +public final class TopicPartitionOffset { + + private int hash = 0; + private final int partition; + private final String topic; + private final long offset; + + public TopicPartitionOffset(String topic, int partition, long offset) { + this.partition = partition; + this.topic = topic; + this.offset = offset; + } + + public int partition() { + return partition; + } + + public String topic() { + return topic; + } + + public long offset() { + return offset; + } + + @Override + public int hashCode() { + if (hash != 0) + return hash; + final int prime = 31; + int result = 1; + result = prime * result + (int)(offset ^ (offset >>> 32)); + result = prime * result + partition; + result = prime * result + ((topic == null) ? 0 : topic.hashCode()); + this.hash = result; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + TopicPartitionOffset other = (TopicPartitionOffset) obj; + if(offset != other.offset) + return false; + if (partition != other.partition) + return false; + if (topic == null) { + if (other.topic != null) + return false; + } else if (!topic.equals(other.topic)) + return false; + return true; + } + + @Override + public String toString() { + return topic + "-" + partition + "-" + offset; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index a6423f4..a7a5150 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -13,7 +13,6 @@ package org.apache.kafka.clients.producer; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -44,6 +43,7 @@ import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.ClientUtils; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; @@ -118,7 +118,7 @@ public class KafkaProducer implements Producer { config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, time); - List addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); this.sender = new Sender(new Selector(this.metrics, time), this.metadata, @@ -150,28 +150,6 @@ public class KafkaProducer implements Producer { } } - private static List parseAndValidateAddresses(List urls) { - List addresses = new ArrayList(); - for (String url : urls) { - if (url != null && url.length() > 0) { - String[] pieces = url.split(":"); - if (pieces.length != 2) - throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); - try { - InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1])); - if (address.isUnresolved()) - throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url); - addresses.add(address); - } catch (NumberFormatException e) { - throw new ConfigException("Invalid port in metadata.broker.list: " + url); - } - } - } - if (addresses.size() < 1) - throw new ConfigException("No bootstrap urls given in metadata.broker.list."); - return addresses; - } - /** * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)} */ diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java new file mode 100644 index 0000000..cb33e34 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.ConfigException; + +public class ClientUtils { + public static List parseAndValidateAddresses(List urls) { + List addresses = new ArrayList(); + for (String url : urls) { + if (url != null && url.length() > 0) { + String[] pieces = url.split(":"); + if (pieces.length != 2) + throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); + try { + InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1])); + if (address.isUnresolved()) + throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url); + addresses.add(address); + } catch (NumberFormatException e) { + throw new ConfigException("Invalid port in metadata.broker.list: " + url); + } + } + } + if (addresses.size() < 1) + throw new ConfigException("No bootstrap urls given in metadata.broker.list."); + return addresses; + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java new file mode 100644 index 0000000..1b5ef14 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. +*/ +package org.apache.kafka.clients.consumer; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import kafka.common.TopicPartitionOffset; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.junit.Test; + +/** + * TODO: Clean this after the consumer implementation is complete. Until then, it is useful to write some sample test code using the new APIs + * + */ +public class ConsumerExampleTest { + /** + * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load + * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are automatically committed periodically, + * as controlled by the auto.commit.interval.ms config + */ + @Test + public void testConsumerGroupManagementWithAutoOffsetCommits() { + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + props.put("group.id", "test"); + props.put("session.timeout.ms", "1000"); + props.put("auto.commit.enable", "true"); + props.put("auto.commit.interval.ms", "10000"); + KafkaConsumer consumer = new KafkaConsumer(props); + // subscribe to some topics + consumer.subscribe("foo", "bar"); + boolean isRunning = true; + while(isRunning) { + List records = consumer.poll(100, TimeUnit.MILLISECONDS); + process(records); + } + consumer.close(); + } + + /** + * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load + * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are manually committed using + * either the commit() or commitAsync() APIs. This example also demonstrates rewinding the consumer's offsets if processing of consumed + * messages fails. + */ + @Test + public void testConsumerGroupManagementWithManualOffsetCommit() { + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + props.put("group.id", "test"); + props.put("session.timeout.ms", "1000"); + props.put("auto.commit.enable", "false"); + KafkaConsumer consumer = new KafkaConsumer(props); + // subscribe to some topics + consumer.subscribe("foo", "bar"); + int commitInterval = 100; + int numRecords = 0; + boolean isRunning = true; + Map consumedOffsets = new HashMap(); + while(isRunning) { + List records = consumer.poll(100, TimeUnit.MILLISECONDS); + try { + Map lastConsumedOffsets = process(records); + consumedOffsets.putAll(lastConsumedOffsets); + numRecords += records.size(); + // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance + if(numRecords % commitInterval == 0) + consumer.commit(); + } catch(Exception e) { + // rewind consumer's offsets for failed partitions + List failedPartitions = getFailedPartitions(); + Map offsetsToRewindTo = new HashMap(); + for(TopicPartition failedPartition : failedPartitions) { + // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset + // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to. + offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition)); + } + // seek to new offsets only for partitions that failed the last process() + consumer.seek(offsetsToRewindTo); + } + } + consumer.close(); + } + + private List getFailedPartitions() { return null; } + + /** + * This example demonstrates the consumer can be used to leverage Kafka's group management functionality along with custom offset storage. + * In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to + * plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback + * callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance and + * before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer. + */ + @Test + public void testConsumerRebalanceWithCustomOffsetStore() { + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + props.put("group.id", "test"); + props.put("session.timeout.ms", "1000"); + props.put("auto.commit.enable", "true"); + props.put("auto.commit.interval.ms", "10000"); + KafkaConsumer consumer = new KafkaConsumer(props, + new ConsumerRebalanceCallback() { + public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) { + Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions); + consumer.seek(lastCommittedOffsets); + } + public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) { + TopicPartitionOffset[] offsets = getLastConsumedOffsets(partitions); // implemented by the user + commitOffsetsToCustomStore(offsets); // implemented by the user + } + private Map getLastCommittedOffsetsFromCustomStore(TopicPartition... partitions) { + return null; + } + private TopicPartitionOffset[] getLastConsumedOffsets(TopicPartition... partitions) { return null; } + private void commitOffsetsToCustomStore(TopicPartitionOffset... offsets) {} + }); + // subscribe to topics + consumer.subscribe("foo", "bar"); + int commitInterval = 100; + int numRecords = 0; + boolean isRunning = true; + while(isRunning) { + List records = consumer.poll(100, TimeUnit.MILLISECONDS); + Map consumedOffsets = process(records); + numRecords += records.size(); + // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance + if(numRecords % commitInterval == 0) + commitOffsetsToCustomStore(consumedOffsets); + } + consumer.close(); + } + + /** + * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with Kafka based offset storage. + * In this example, the assumption made is that the user chooses to use Kafka based offset management. + */ + @Test + public void testConsumerRewindWithGroupManagementAndKafkaOffsetStorage() { + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + props.put("group.id", "test"); + props.put("session.timeout.ms", "1000"); + props.put("auto.commit.enable", "false"); + KafkaConsumer consumer = new KafkaConsumer(props, + new ConsumerRebalanceCallback() { + boolean rewindOffsets = true; + public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) { + if(rewindOffsets) { + Map latestCommittedOffsets = consumer.committed(partitions); + Map newOffsets = rewindOffsets(latestCommittedOffsets, 100); + consumer.seek(newOffsets); + } + } + public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) { + consumer.commit(); + } + // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages + private Map rewindOffsets(Map currentOffsets, + long numberOfMessagesToRewindBackTo) { + Map newOffsets = new HashMap(); + for(Map.Entry offset : currentOffsets.entrySet()) { + newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo); + } + return newOffsets; + } + }); + // subscribe to topics + consumer.subscribe("foo", "bar"); + int commitInterval = 100; + int numRecords = 0; + boolean isRunning = true; + while(isRunning) { + List records = consumer.poll(100, TimeUnit.MILLISECONDS); + Map consumedOffsets = process(records); + numRecords += records.size(); + // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance + if(numRecords % commitInterval == 0) + commitOffsetsToCustomStore(consumedOffsets); + } + consumer.close(); + } + + /** + * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest + * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes + * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions. + * This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka + * based offset management. However, session.timeout.ms is not required since the Kafka consumer only does failure detection with group + * management. + */ + @Test + public void testConsumerWithKafkaBasedOffsetManagement() { + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + props.put("group.id", "test"); + props.put("auto.commit.enable", "true"); + props.put("auto.commit.interval.ms", "10000"); + KafkaConsumer consumer = new KafkaConsumer(props); + // subscribe to some partitions of topic foo + TopicPartition partition0 = new TopicPartition("foo", 0); + TopicPartition partition1 = new TopicPartition("foo", 1); + TopicPartition[] partitions = new TopicPartition[2]; + partitions[0] = partition0; + partitions[1] = partition1; + consumer.subscribe(partitions); + // find the last committed offsets for partitions 0,1 of topic foo + Map lastCommittedOffsets = consumer.committed(partition0, partition1); + // seek to the last committed offsets to avoid duplicates + consumer.seek(lastCommittedOffsets); + // find the offsets of the latest available messages to know where to stop consumption + Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1); + boolean isRunning = true; + while(isRunning) { + List records = consumer.poll(100, TimeUnit.MILLISECONDS); + Map consumedOffsets = process(records); + for(TopicPartition partition : partitions) { + if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition)) + isRunning = false; + else + isRunning = true; + } + } + consumer.close(); + } + + /** + * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest + * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes + * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions. + * This example assumes that the user chooses to use custom offset storage. + */ + @Test + public void testConsumerWithCustomOffsetManagement() { + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + KafkaConsumer consumer = new KafkaConsumer(props); + // subscribe to some partitions of topic foo + TopicPartition partition0 = new TopicPartition("foo", 0); + TopicPartition partition1 = new TopicPartition("foo", 1); + TopicPartition[] partitions = new TopicPartition[2]; + partitions[0] = partition0; + partitions[1] = partition1; + consumer.subscribe(partitions); + Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(); + // seek to the last committed offsets to avoid duplicates + consumer.seek(lastCommittedOffsets); + // find the offsets of the latest available messages to know where to stop consumption + Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1); + boolean isRunning = true; + while(isRunning) { + List records = consumer.poll(100, TimeUnit.MILLISECONDS); + Map consumedOffsets = process(records); + // commit offsets for partitions 0,1 for topic foo to custom store + commitOffsetsToCustomStore(consumedOffsets); + for(TopicPartition partition : partitions) { + if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition)) + isRunning = false; + else + isRunning = true; + } + } + consumer.close(); + } + + private Map getLastCommittedOffsetsFromCustomStore() { return null; } + private void commitOffsetsToCustomStore(Map consumedOffsets) {} + private Map process(List records) { + Map processedOffsets = new HashMap(); + for(int i = 0;i < records.size(); i++) { + ConsumerRecord record = records.get(i); + // process record + processedOffsets.put(record.partition(), record.offset()); + } + return processedOffsets; + } +}