diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java new file mode 100644 index 0000000..fe83fd3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. +*/ +package org.apache.kafka.clients.consumer; + +import java.io.Closeable; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + + +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.TopicPartition; + +/** + * @see KafkaConsumer + * @see MockConsumer + */ +public interface Consumer extends Closeable { + + /** + * Incrementally subscribe to the given list of topics. This API is mutually exclusive to + * {@link #subscribe(TopicPartition...) subscribe(partitions)} + * @param topics A variable list of topics that the consumer subscribes to + */ + public void subscribe(String...topics); + + /** + * Incrementally subscribes to a specific topic and partition. This API is mutually exclusive to + * {@link #subscribe(String...) subscribe(topics)} + * @param partitions Partitions to subscribe to + */ + public void subscribe(TopicPartition... partitions); + + /** + * Unsubscribe from the specific topics. Messages for this topic will not be returned from the next {@link #poll(long, TimeUnit) poll()} + * onwards. This should be used in conjunction with {@link #subscribe(String...) subscribe(topics)}. It is an error to + * unsubscribe from a topic that was never subscribed to using {@link #subscribe(String...) subscribe(topics)} + * @param topics Topics to unsubscribe from + */ + public void unsubscribe(String... topics); + + /** + * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next + * {@link #poll(long, TimeUnit) poll()} onwards. This should be used in conjunction with + * {@link #subscribe(TopicPartition...) subscribe(topic, partitions)}. It is an error to + * unsubscribe from a partition that was never subscribed to using {@link #subscribe(TopicPartition...) subscribe(partitions)} + * @param partitions Partitions to unsubscribe from + */ + public void unsubscribe(TopicPartition... partitions); + + /** + * Fetches data for the subscribed list of topics and partitions + * @param timeout The time spent waiting in poll if data is not available. If timeout is negative, + * it waits indefinitely for data to come in + * @param timeUnit Unit of the timeout + * @return Map of topic to records for the subscribed topics and partitions as soon as data is available for a topic partition. Availability + * of data is controlled by {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} and {@link ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG}. + * If no data is available for timeout ms, returns an empty list + */ + public Map poll(long timeout, TimeUnit timeUnit); + + /** + * Commits offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the subscribed list of topics and partitions. + * @return A {@link java.util.concurrent.Future Future} for the {@link OffsetMetadata} that contains the partition, offset and a + * corresponding error code. Invoking {@link java.util.concurrent.Future#get() get()} on this future will block until the commit + * offset request completes and will throw any exception that occurred while committing the offsets for the subscribed partitions. + */ + public Future commit(); + + /** + * Commits the specified offsets for the specified list of topics and partitions to Kafka and returns immediately after + * sending the commit offset request. + * @param offsets The map of offsets to commit for the given topic partitions + * @return A {@link java.util.concurrent.Future Future} for the {@link OffsetMetadata} that contains the partition, offset and a + * corresponding error code. Invoking {@link java.util.concurrent.Future#get() get()} on this future will block until the commit + * offset request completes and will throw any exception that occurred while committing the offsets for the specified partitions. + */ + public Future commit(Map offsets); + + /** + * Overrides the fetch positions that the consumer will use on the next fetch request. If the consumer subscribes to a list of topics + * using {@link #subscribe(String...) subscribe(topics)}, an exception will be thrown if the specified topic partition is not owned by + * the consumer. + * @param offsets The map of fetch positions per topic and partition + */ + public void seek(Map offsets); + + /** + * Returns the fetch position of the specified topic partition to be used on the next {@link #poll(long, TimeUnit) poll()} + * @param partition Partition for which the fetch position will be returned + * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long, TimeUnit) poll()} + */ + public Map position(TopicPartition... partition); + + /** + * Fetches the last committed offsets for the input list of partitions + * @param partitions The list of partitions to return the last committed offset for + * @return The list of offsets for the specified list of partitions + */ + public Map committed(TopicPartition...partitions); + + /** + * Fetches offsets before a certain timestamp + * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp. + * @param partitions The list of partitions for which the offsets are returned + * @return The offsets for messages that were written to the server before the specified timestamp. + */ + public Map offsetsBeforeTime(long timestamp, TopicPartition...partitions); + + /** + * Return a map of metrics maintained by the consumer + */ + public Map metrics(); + + /** + * Close this consumer + */ + public void close(); + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java new file mode 100644 index 0000000..1b22a0f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. +*/ +package org.apache.kafka.clients.consumer; + +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; + +/** + * The consumer configuration keys + */ +public class ConsumerConfig extends AbstractConfig { + private static final ConfigDef config; + + /** + * The identifier of the group this consumer belongs to. This is required if the consumer uses either the + * group management functionality by using {@link Consumer#subscribe(String...) subscribe(topics)}. This is also required + * if the consumer uses the default Kafka based offset management strategy. + */ + public static final String GROUP_ID_CONFIG = "group.id"; + + /** + * The timeout after which, if the {@link Consumer#poll(long, TimeUnit) poll(timeout)} is not invoked, the consumer is + * marked dead and a rebalance operation is triggered for the group identified by {@link #GROUP_ID_CONFIG}. Relevant + * if the consumer uses the group management functionality by invoking {@link Consumer#subscribe(String...) subscribe(topics)} + */ + public static final String SESSION_TIMEOUT_MS = "session.timeout.ms"; + + /** + * The number of times a consumer sends a heartbeat to the co-ordinator broker within a {@link #SESSION_TIMEOUT_MS} time window. + * This frequency affects the latency of a rebalance operation since the co-ordinator broker notifies a consumer of a rebalance + * in the heartbeat response. Relevant if the consumer uses the group management functionality by invoking + * {@link Consumer#subscribe(String...) subscribe(topics)} + */ + public static final String HEARTBEAT_FREQUENCY = "heartbeat.frequency"; + + /** + * A list of URLs to use for establishing the initial connection to the cluster. This list should be in the form + * host1:port1,host2:port2,.... These urls are just used for the initial connection to discover the + * full cluster membership (which may change dynamically) so this list need not contain the full set of servers (you + * may want more than one, though, in case a server is down). + */ + public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; + + /** + * If true, periodically commit to Kafka the offsets of messages already returned by the consumer. This committed + * offset will be used when the process fails as the position from which the consumption will begin. + */ + public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"; + + /** + * The frequency in milliseconds that the consumer offsets are committed to Kafka. Relevant if {@link #ENABLE_AUTO_COMMIT_CONFIG} + * is turned on. + */ + public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms"; + + /** + * What to do when there is no initial offset in Kafka or if an offset is out of range: + *
    + *
  • smallest: automatically reset the offset to the smallest offset + *
  • largest: automatically reset the offset to the largest offset + *
  • disable: throw exception to the consumer if no previous offset is found for the consumer's group + *
  • anything else: throw exception to the consumer. + *
+ */ + public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset"; + + /** + * The minimum amount of data the server should return for a fetch request. If insufficient data is available the + * request will wait for that much data to accumulate before answering the request. + */ + public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes"; + + /** + * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient + * data to immediately satisfy {@link #FETCH_MIN_BYTES_CONFIG}. This should be less than or equal to the timeout used in + * {@link KafkaConsumer#poll(long, TimeUnit) poll(timeout)} + */ + public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms"; + + /** + * The maximum amount of time to block waiting to fetch metadata about a topic the first time a record is received + * from that topic. The consumer will throw a TimeoutException if it could not successfully fetch metadata within + * this timeout. + */ + public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; + + /** + * The total memory used by the consumer to buffer records received from the server. This config is meant to control + * the consumer's memory usage, so it is the size of the global fetch buffer that will be shared across all partitions. + */ + public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes"; + + /** + * The minimum amount of memory that should be used to fetch at least one message for a partition. This puts a lower + * bound on the consumer's memory utilization when there is at least one message for a partition available on the server. + * This size must be at least as large as the maximum message size the server allows or else it is possible for the producer + * to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large + * message on a certain partition. + */ + public static final String FETCH_BUFFER_CONFIG = "fetch.buffer.bytes"; + + /** + * The id string to pass to the server when making requests. The purpose of this is to be able to track the source + * of requests beyond just ip/port by allowing a logical application name to be included. + */ + public static final String CLIENT_ID_CONFIG = "client.id"; + + /** + * The size of the TCP send buffer to use when fetching data + */ + public static final String SOCKET_RECEIVE_BUFFER_CONFIG = "socket.receive.buffer.bytes"; + + /** + * The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a + * host in a tight loop. This backoff applies to all requests sent by the consumer to the broker. + */ + public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms"; + + /** metrics.sample.window.ms */ + public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms"; + private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. " + + "When a window expires we erase and overwrite the oldest window."; + + /** metrics.num.samples */ + public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples"; + private static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics."; + + /** metric.reporters */ + public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; + private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; + + static { + /* TODO: add config docs */ + config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, "blah blah") + .define(GROUP_ID_CONFIG, Type.STRING, Importance.HIGH, "blah blah") + .define(SESSION_TIMEOUT_MS, Type.LONG, 1000, Importance.HIGH, "blah blah") + .define(HEARTBEAT_FREQUENCY, Type.INT, 3, Importance.MEDIUM, "blah blah") + .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), Importance.MEDIUM, "blah blah") + .define(ENABLE_AUTO_COMMIT_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, "blah blah") + .define(AUTO_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, 5000, atLeast(0), Importance.LOW, "blah blah") + .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.LOW, "blah blah") + .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.LOW, "blah blah") + .define(FETCH_BUFFER_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), Importance.HIGH, "blah blah") + .define(SOCKET_RECEIVE_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.LOW, "blah blah") + .define(FETCH_MIN_BYTES_CONFIG, Type.LONG, 1024, atLeast(0), Importance.HIGH, "blah blah") + .define(FETCH_MAX_WAIT_MS_CONFIG, Type.LONG, 500, atLeast(0), Importance.LOW, "blah blah") + .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, "blah blah") + .define(AUTO_OFFSET_RESET_CONFIG, Type.STRING, "largest", Importance.MEDIUM, "blah blah") + .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, + Type.LONG, + 30000, + atLeast(0), + Importance.LOW, + METRICS_SAMPLE_WINDOW_MS_DOC) + .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC) + .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC); + + } + + ConsumerConfig(Map props) { + super(config, props); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java new file mode 100644 index 0000000..9482b51 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. +*/ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.TopicPartition; + +/** + * A callback interface that the user can implement to manage customized offsets on the start and end of + * every rebalance operation. This callback will execute in the user thread as part of the + * {@link Consumer#poll(long, TimeUnit) poll(long)} API on every rebalance attempt. + * Default implementation of the callback will {@link Consumer#seek(java.util.Map) seek(offsets)} to the last committed offsets in the + * {@link #onPartitionsAssigned(Consumer, TopicPartition...) onPartitionsAssigned()} callback. And will commit offsets synchronously + * for the specified list of partitions to Kafka in the {@link #onPartitionsRevoked(Consumer, TopicPartition...) onPartitionsRevoked()} + * callback. + */ +public interface ConsumerRebalanceCallback { + + /** + * A callback method the user can implement to provide handling of customized offsets on completion of a successful + * rebalance operation. This method will be called after a rebalance operation completes and before the consumer + * starts fetching data. + *

+ * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} + * @param partitions The list of partitions that are assigned to the consumer after rebalance + */ + public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions); + + /** + * A callback method the user can implement to provide handling of offset commits to a customized store on the + * start of a rebalance operation. This method will be called before a rebalance operation starts and after the + * consumer stops fetching data. It is recommended that offsets should be committed in this callback to + * either Kafka or a custom offset store to prevent duplicate data + *

+ * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} + * @param partitions The list of partitions that were assigned to the consumer on the last rebalance + */ + public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions); +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java new file mode 100644 index 0000000..350f354 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. +*/ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.TopicPartition; + +/** + * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the + * record is being received and an offset that points to the record in a Kafka partition. + */ +public final class ConsumerRecord { + private final TopicPartition partition; + private final byte[] key; + private final byte[] value; + private final long offset; + private volatile Exception error; + + /** + * Creates a record to be received from a specified topic and partition + * + * @param topic The topic this record is received from + * @param partitionId The partition of the topic this record is received from + * @param key The key of the record, if one exists + * @param value The record contents + * @param offset The offset of this record in the corresponding Kafka partition + */ + public ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset) { + this(topic, partitionId, key, value, offset, null); + } + + /** + * Create a record with no key + * + * @param topic The topic this record is received from + * @param partitionId The partition of the topic this record is received from + * @param value The record contents + * @param offset The offset of this record in the corresponding Kafka partition + */ + public ConsumerRecord(String topic, int partitionId, byte[] value, long offset) { + this(topic, partitionId, null, value, offset); + } + + /** + * Creates a record with an error code + * @param topic The topic this record is received from + * @param partitionId The partition of the topic this record is received from + * @param error The exception corresponding to the error code returned by the server for this topic partition + */ + public ConsumerRecord(String topic, int partitionId, Exception error) { + this(topic, partitionId, null, null, -1L, error); + } + + private ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset, Exception error) { + if (topic == null) + throw new IllegalArgumentException("Topic cannot be null"); + this.partition = new TopicPartition(topic, partitionId); + this.key = key; + this.value = value; + this.offset = offset; + this.error = error; + } + + /** + * The topic this record is received from + */ + public String topic() { + return partition.topic(); + } + + /** + * The partition from which this record is received + */ + public int partition() { + return partition.partition(); + } + + /** + * The TopicPartition object containing the topic and partition + */ + public TopicPartition topicAndPartition() { + return partition; + } + + /** + * The key (or null if no key is specified) + * @throws Exception The exception thrown while fetching this record. + */ + public byte[] key() throws Exception { + if (this.error != null) + throw this.error; + return key; + } + + /** + * The value + * @throws Exception The exception thrown while fetching this record. + */ + public byte[] value() throws Exception { + if (this.error != null) + throw this.error; + return value; + } + + /** + * The offset of this record in the corresponding Kafka partition. + * @throws Exception The exception thrown while fetching this record. + */ + public long offset() throws Exception { + if (this.error != null) + throw this.error; + return offset; + } + + public Exception error() { + return this.error; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java new file mode 100644 index 0000000..2546221 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. +*/ +package org.apache.kafka.clients.consumer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * A container that holds the list {@link ConsumerRecord} per partition for a particular topic. There is one for every topic returned by a + * {@link Consumer#poll(long, java.util.concurrent.TimeUnit)} operation. + */ +public class ConsumerRecords { + + private final String topic; + private final Map> recordsPerPartition; + + public ConsumerRecords(String topic, Map> records) { + this.topic = topic; + this.recordsPerPartition = records; + } + + /** + * @param partitions The input list of partitions for a particular topic. If no partitions are + * specified, returns records for all partitions + * @return The list of {@link ConsumerRecord}s associated with the given partitions. + */ + public List records(int... partitions) { + List recordsToReturn = new ArrayList(); + if(partitions.length == 0) { + // return records for all partitions + for(Entry> record : recordsPerPartition.entrySet()) { + recordsToReturn.addAll(record.getValue()); + } + } else { + for(int partition : partitions) { + List recordsForThisPartition = recordsPerPartition.get(partition); + recordsToReturn.addAll(recordsForThisPartition); + } + } + return recordsToReturn; + } + + /** + * @return The topic of all records associated with this instance + */ + public String topic() { + return this.topic; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java new file mode 100644 index 0000000..3e2838f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -0,0 +1,580 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. +*/ +package org.apache.kafka.clients.consumer; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.utils.ClientUtils; +import org.apache.kafka.common.utils.SystemTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Kafka client that consumes records from a Kafka cluster. + *

+ * The consumer is thread safe and should generally be shared among all threads for best performance. + *

+ * The consumer is single threaded and multiplexes I/O over TCP connections to each of the brokers it + * needs to communicate with. Failure to close the consumer after use will leak these resources. + *

Usage Examples

+ * The consumer APIs offer flexibility to cover a variety of consumption use cases. Following are some examples to demonstrate the correct use of + * the available APIs. Each of the examples assumes the presence of a user implemented process() method that processes a given batch of messages + * and returns the offset of the latest processed message per partition. Note that process() is not part of the consumer API and is only used as + * a convenience method to demonstrate the different use cases of the consumer APIs. Here is a sample implementation of such a process() method. + *
+ * {@code
+ * private Map process(Map records) {
+ *     Map processedOffsets = new HashMap();
+ *     for(Entry recordMetadata : records.entrySet()) {
+ *          List recordsPerTopic = recordMetadata.getValue().records();
+ *          for(int i = 0;i < recordsPerTopic.size();i++) {
+ *               ConsumerRecord record = recordsPerTopic.get(i);
+ *               // process record
+ *               processedOffsets.put(record.partition(), record.offset());                
+ *          }
+ *     }
+ *     return processedOffsets; 
+ * }
+ * }
+ * 
+ *

+ * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load + * balancing and failover. This example assumes that the offsets are stored in Kafka and are automatically committed periodically, + * as controlled by the auto.commit.interval.ms config + *

+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("enable.auto.commit", "true");
+ * props.put("auto.commit.interval.ms", "10000");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * consumer.subscribe("foo", "bar");
+ * boolean isRunning = true;
+ * while(isRunning) {
+ *   Map records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *   process(records);
+ * }
+ * consumer.close();
+ * }
+ * 
+ * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load + * balancing and failover. This example assumes that the offsets are stored in Kafka and are manually committed using + * the commit() API. This example also demonstrates rewinding the consumer's offsets if processing of the consumed + * messages fails. Note that this method of rewinding offsets using {@link #seek(Map) seek(offsets)} is only useful for rewinding the offsets + * of the current consumer instance. As such, this will not trigger a rebalance or affect the fetch offsets for the other consumer instances. + *
+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("enable.auto.commit", "false");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * consumer.subscribe("foo", "bar");
+ * int commitInterval = 100;
+ * int numRecords = 0;
+ * boolean isRunning = true;
+ * Map consumedOffsets = new HashMap();
+ * while(isRunning) {
+ *     Map records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     try {
+ *         Map lastConsumedOffsets = process(records);
+ *         consumedOffsets.putAll(lastConsumedOffsets);
+ *         numRecords += records.size();
+ *         // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+ *         if(numRecords % commitInterval == 0) 
+ *           consumer.commit();
+ *     } catch(Exception e) {
+ *         try {
+ *             // rewind consumer's offsets for failed partitions
+ *             // assume failedPartitions() returns the list of partitions for which the processing of the last batch of messages failed
+ *             List failedPartitions = failedPartitions();   
+ *             Map offsetsToRewindTo = new HashMap();
+ *             for(TopicPartition failedPartition : failedPartitions) {
+ *                 // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset
+ *                 // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to.
+ *                 offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
+ *             }
+ *             // seek to new offsets only for partitions that failed the last process()
+ *             consumer.seek(offsetsToRewindTo);
+ *         } catch(Exception e) {  break; } // rewind failed
+ *     }
+ * }         
+ * consumer.close();
+ * }
+ * 
+ *

+ * This example demonstrates how to rewind the offsets of the entire consumer group. It is assumed that the user has chosen to use Kafka's + * group management functionality for automatic consumer load balancing and failover. This example also assumes that the offsets are stored in + * Kafka. If group management is used, the right place to systematically rewind offsets for every consumer instance is inside the + * ConsumerRebalanceCallback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance + * and before the consumption restarts post rebalance. This is the right place to supply the newly rewound offsets to the consumer. It + * is recommended that if you foresee the requirement to ever reset the consumer's offsets in the presence of group management, that you + * always configure the consumer to use the ConsumerRebalanceCallback with a flag that protects whether or not the offset rewind logic is used. + * This method of rewinding offsets is useful if you notice an issue with your message processing after successful consumption and offset commit. + * And you would like to rewind the offsets for the entire consumer group as part of rolling out a fix to your processing logic. In this case, + * you would configure each of your consumer instances with the offset rewind configuration flag turned on and bounce each consumer instance + * in a rolling restart fashion. Each restart will trigger a rebalance and eventually all consumer instances would have rewound the offsets for + * the partitions they own, effectively rewinding the offsets for the entire consumer group. + *

+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("enable.auto.commit", "false");
+ * KafkaConsumer consumer = new KafkaConsumer(props,
+ *                                            new ConsumerRebalanceCallback() {
+ *                                                boolean rewindOffsets = true;  // should be retrieved from external application config
+ *                                                public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) {
+ *                                                    Map latestCommittedOffsets = consumer.committed(partitions);
+ *                                                    if(rewindOffsets)
+ *                                                        Map newOffsets = rewindOffsets(latestCommittedOffsets, 100);
+ *                                                    consumer.seek(newOffsets);
+ *                                                }
+ *                                                public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) {
+ *                                                    consumer.commit();
+ *                                                }
+ *                                                // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages 
+ *                                                private Map rewindOffsets(Map currentOffsets,
+ *                                                                                                long numberOfMessagesToRewindBackTo) {
+ *                                                    Map newOffsets = new HashMap();
+ *                                                    for(Map.Entry offset : currentOffsets.entrySet()) 
+ *                                                        newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo);
+ *                                                    return newOffsets;
+ *                                                }
+ *                                            });
+ * consumer.subscribe("foo", "bar");
+ * int commitInterval = 100;
+ * int numRecords = 0;
+ * boolean isRunning = true;
+ * Map consumedOffsets = new HashMap();
+ * while(isRunning) {
+ *     Map records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map lastConsumedOffsets = process(records);
+ *     consumedOffsets.putAll(lastConsumedOffsets);
+ *     numRecords += records.size();
+ *     // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+ *     if(numRecords % commitInterval == 0) 
+ *         consumer.commit(consumedOffsets);
+ * }
+ * consumer.close();
+ * }
+ * 
+ * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with custom offset storage. + * In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to + * plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback + * callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance and + * before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer. + *

+ * Similarly, the user would also be required to plugin logic for storing the consumer's offsets to a custom store. The onPartitionsRevoked + * callback is invoked right after the consumer has stopped fetching data and before the partition ownership changes. This is the right place + * to commit the offsets for the current set of partitions owned by the consumer. + *

+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage
+ * KafkaConsumer consumer = new KafkaConsumer(props,
+ *                                            new ConsumerRebalanceCallback() {
+ *                                                public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) {
+ *                                                    Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
+ *                                                    consumer.seek(lastCommittedOffsets);
+ *                                                }
+ *                                                public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) {
+ *                                                    Map offsets = getLastConsumedOffsets(partitions);
+ *                                                    commitOffsetsToCustomStore(offsets); 
+ *                                                }
+ *                                                // following APIs should be implemented by the user for custom offset management
+ *                                                private Map getLastCommittedOffsetsFromCustomStore(TopicPartition... partitions) {
+ *                                                    return null;
+ *                                                }
+ *                                                private Map getLastConsumedOffsets(TopicPartition... partitions) { return null; }
+ *                                                private void commitOffsetsToCustomStore(Map offsets) {}
+ *                                            });
+ * consumer.subscribe("foo", "bar");
+ * int commitInterval = 100;
+ * int numRecords = 0;
+ * boolean isRunning = true;
+ * Map consumedOffsets = new HashMap();
+ * while(isRunning) {
+ *     Map records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map lastConsumedOffsets = process(records);
+ *     consumedOffsets.putAll(lastConsumedOffsets);
+ *     numRecords += records.size();
+ *     // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+ *     if(numRecords % commitInterval == 0) 
+ *         commitOffsetsToCustomStore(consumedOffsets);
+ * }
+ * consumer.close();
+ * }
+ * 
+ * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest + * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes + * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions. + * This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka + * based offset management. However, session.timeout.ms is not required since the Kafka consumer only does automatic failover when group + * management is used. + *
+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("enable.auto.commit", "true");
+ * props.put("auto.commit.interval.ms", "10000");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * // subscribe to some partitions of topic foo
+ * TopicPartition partition0 = new TopicPartition("foo", 0);
+ * TopicPartition partition1 = new TopicPartition("foo", 1);
+ * TopicPartition[] partitions = new TopicPartition[2];
+ * partitions[0] = partition0;
+ * partitions[1] = partition1;
+ * consumer.subscribe(partitions);
+ * // find the last committed offsets for partitions 0,1 of topic foo
+ * Map lastCommittedOffsets = consumer.committed(partition0, partition1);
+ * // seek to the last committed offsets to avoid duplicates
+ * consumer.seek(lastCommittedOffsets);        
+ * // find the offsets of the latest available messages to know where to stop consumption
+ * Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1);
+ * boolean isRunning = true;
+ * Map consumedOffsets = new HashMap();
+ * while(isRunning) {
+ *     Map records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map lastConsumedOffsets = process(records);
+ *     consumedOffsets.putAll(lastConsumedOffsets);
+ *     for(TopicPartition partition : partitions) {
+ *         if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
+ *             isRunning = false;
+ *         else
+ *             isRunning = true;
+ *     }
+ * }
+ * consumer.commit();
+ * consumer.close();
+ * }
+ * 
+ * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest + * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes + * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions. + * This example assumes that the user chooses to use custom offset storage. + *
+ * {@code  
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * // subscribe to some partitions of topic foo
+ * TopicPartition partition0 = new TopicPartition("foo", 0);
+ * TopicPartition partition1 = new TopicPartition("foo", 1);
+ * TopicPartition[] partitions = new TopicPartition[2];
+ * partitions[0] = partition0;
+ * partitions[1] = partition1;
+ * consumer.subscribe(partitions);
+ * Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
+ * // seek to the last committed offsets to avoid duplicates
+ * consumer.seek(lastCommittedOffsets);        
+ * // find the offsets of the latest available messages to know where to stop consumption
+ * Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1);
+ * boolean isRunning = true;
+ * Map consumedOffsets = new HashMap();
+ * while(isRunning) {
+ *     Map records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map lastConsumedOffsets = process(records);
+ *     consumedOffsets.putAll(lastConsumedOffsets);
+ *     // commit offsets for partitions 0,1 for topic foo to custom store
+ *     commitOffsetsToCustomStore(consumedOffsets);
+ *     for(TopicPartition partition : partitions) {
+ *         if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
+ *             isRunning = false;
+ *         else
+ *             isRunning = true;
+ *     }            
+ * }      
+ * commitOffsetsToCustomStore(consumedOffsets);   
+ * consumer.close();
+ * }
+ * 
+ */ +public class KafkaConsumer implements Consumer { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class); + + private final long metadataFetchTimeoutMs; + private final long totalMemorySize; + private final Metrics metrics; + private final Set subscribedTopics; + private final Set subscribedPartitions; + + /** + * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings + * are documented here. Values can be + * either strings or Objects of the appropriate type (for example a numeric configuration would accept either the + * string "42" or the integer 42). + *

+ * Valid configuration strings are documented at {@link ConsumerConfig} + * @param configs The consumer configs + */ + public KafkaConsumer(Map configs) { + this(new ConsumerConfig(configs), null); + } + + /** + * A consumer is instantiated by providing a set of key-value pairs as configuration and a {@link ConsumerRebalanceCallback} + * implementation + *

+ * Valid configuration strings are documented at {@link ConsumerConfig} + * @param configs The consumer configs + * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of + * every rebalance operation. + */ + public KafkaConsumer(Map configs, ConsumerRebalanceCallback callback) { + this(new ConsumerConfig(configs), callback); + } + + /** + * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration. + * Valid configuration strings are documented at {@link ConsumerConfig} + */ + public KafkaConsumer(Properties properties) { + this(new ConsumerConfig(properties), null); + } + + /** + * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a + * {@link ConsumerRebalanceCallback} implementation. + *

+ * Valid configuration strings are documented at {@link ConsumerConfig} + * @param properties The consumer configuration properties + * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of + * every rebalance operation. + */ + public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) { + this(new ConsumerConfig(properties), callback); + } + + private KafkaConsumer(ConsumerConfig config) { + this(config, null); + } + + private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback) { + log.trace("Starting the Kafka consumer"); + subscribedTopics = new HashSet(); + subscribedPartitions = new HashSet(); + this.metrics = new Metrics(new MetricConfig(), + Collections.singletonList((MetricsReporter) new JmxReporter("kafka.consumer.")), + new SystemTime()); + this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG); + this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG); + List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + config.logUnused(); + log.debug("Kafka consumer started"); + } + + /** + * Incrementally subscribes to the given list of topics and uses the consumer's group management functionality + *

+ * As part of group management, the consumer will keep track of the list of consumers that belong to a particular group and + * will trigger a rebalance operation if one of the following events trigger - + *

    + *
  • Number of partitions change for any of the subscribed list of topics + *
  • Topic is created or deleted + *
  • An existing member of the consumer group dies + *
  • A new member is added to an existing consumer group via the join API + *
+ * @param topics A variable list of topics that the consumer wants to subscribe to + */ + @Override + public void subscribe(String... topics) { + if(subscribedPartitions.size() > 0) + throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); + for(String topic:topics) + subscribedTopics.add(topic); + // TODO: trigger a rebalance operation + } + + /** + * Incrementally subscribes to a specific topic partition and does not use the consumer's group management functionality. As such, + * there will be no rebalance operation triggered when group membership or cluster and topic metadata change. + *

+ * @param partitions Partitions to incrementally subscribe to + */ + @Override + public void subscribe(TopicPartition... partitions) { + if(subscribedTopics.size() > 0) + throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); + for(TopicPartition partition:partitions) + subscribedPartitions.add(partition); + } + + /** + * Unsubscribe from the specific topics. This will trigger a rebalance operation and messages for this topic will not be returned + * from the next {@link #poll(long, TimeUnit) poll()} onwards + * @param topics Topics to unsubscribe from + */ + public void unsubscribe(String... topics) { + // throw an exception if the topic was never subscribed to + for(String topic:topics) { + if(!subscribedTopics.contains(topic)) + throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" + + " to unsubscribe(" + topic + ")"); + subscribedTopics.remove(topic); + } + // TODO trigger a rebalance operation + } + + /** + * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next + * {@link #poll(long, TimeUnit) poll()} onwards + * @param partitions Partitions to unsubscribe from + */ + public void unsubscribe(TopicPartition... partitions) { + // throw an exception if the partition was never subscribed to + for(TopicPartition partition:partitions) { + if(!subscribedPartitions.contains(partition)) + throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" + + partition.topic() + "," + partition.partition() + ") should be called prior" + + " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")"); + subscribedPartitions.remove(partition); + } + // trigger a rebalance operation + } + + /** + * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have subscribed to + * any topics or partitions before polling for data. + *

+ * The offset used for fetching the data is governed by whether or not {@link #seek(Map) seek(offsets)} + * is used. If {@link #seek(Map) seek(offsets)} is used, it will use the specified offsets on startup and + * on every rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed offset + * using {@link #commit(Map) commit(offsets)} + * for the subscribed list of partitions. + * @param timeout The time spent waiting in poll if data is not available. If timeout is negative, + * it waits indefinitely for data to come in + * @param timeUnit Unit of the timeout + * @return map of topic to records since the last fetch for the subscribed list of topics and partitions + */ + @Override + public Map poll(long timeout, TimeUnit timeUnit) { + // TODO Auto-generated method stub + return null; + } + + /** + * Commits the specified offsets for the specified list of topics and partitions to Kafka. + *

+ * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance + * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. + * @param offsets The list of offsets per partition that should be committed to Kafka. + * @return A {@link java.util.concurrent.Future Future} for the {@link OffsetMetadata} that contains the partition, offset and a + * corresponding error code. Invoking {@link java.util.concurrent.Future#get() get()} on this future will result in the metadata + * for the commit offset request or throw any exception that occurred while committing the offsets for the specified partitions. + * For example, if {@link #subscribe(String...) subscribe(topics} is used, an attempt to commit offsets for partitions that the + * consumer does not own, fails. + */ + @Override + public Future commit(Map offsets) { + throw new UnsupportedOperationException(); + } + + /** + * Commits offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the subscribed list of topics and + * partitions. + *

+ * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance + * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. + * @return A {@link java.util.concurrent.Future Future} for the {@link OffsetMetadata} that contains the partition, offset and a + * corresponding error code. Invoking {@link java.util.concurrent.Future#get() get()} on this future will result in the metadata + * for the commit offset request or throw any exception that occurred while committing the offsets for the specified partitions. + */ + @Override + public Future commit() { + throw new UnsupportedOperationException(); + } + + /** + * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long, TimeUnit) poll(timeout)}. If this API is invoked + * for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is + * arbitrarily used in the middle of consumption, to reset the fetch offsets + */ + @Override + public void seek(Map offsets) { + } + + /** + * Returns the fetch position of the specified topic partition to be used on the next {@link #poll(long, TimeUnit) poll()} + * @param partition Partition for which the fetch position will be returned + * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long, TimeUnit) poll()} + */ + public Map position(TopicPartition... partition) { + return null; + } + + /** + * Fetches the last committed offsets of partitions that the consumer currently consumes. This API is only relevant if Kafka based offset + * storage is used. This API can be used in conjunction with {@link #seek(Map) seek(offsets)} to rewind consumption of data. + * @param partitions The list of partitions for which offsets are returned. If no partitions are specified, the offsets for partitions that + * are assigned to this consumer are returned + * @return The list of offsets committed on the last {@link #commit() commit()} + */ + @Override + public Map committed(TopicPartition... partitions) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException(); + } + + /** + * Fetches offsets before a certain timestamp. Note that the offsets returned are approximately computed and do not correspond to the exact + * message at the given timestamp. As such, if the consumer is rewound to offsets returned by this API, there may be duplicate messages + * returned by the consumer. + * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp. + * @param partitions The list of partitions for which the offsets are returned + * @return The offsets per partition before the specified timestamp. + */ + public Map offsetsBeforeTime(long timestamp, TopicPartition...partitions) { + return null; + } + + @Override + public Map metrics() { + return Collections.unmodifiableMap(this.metrics.metrics()); + } + + @Override + public void close() { + log.trace("Closing the Kafka consumer."); + subscribedTopics.clear(); + subscribedPartitions.clear(); + this.metrics.close(); + log.debug("The Kafka consumer has closed."); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java new file mode 100644 index 0000000..3c66065 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. +*/ +package org.apache.kafka.clients.consumer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.internals.FutureOffsetMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.TopicPartition; + +/** + * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. + * This class is not threadsafe + *

+ * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it + * needs to communicate with. Failure to close the consumer after use will leak these resources. + */ +public class MockConsumer implements Consumer { + + private final Set subscribedPartitions; + private final Set subscribedTopics; + private final Map committedOffsets; + private final Map consumedOffsets; + + public MockConsumer() { + subscribedPartitions = new HashSet(); + subscribedTopics = new HashSet(); + committedOffsets = new HashMap(); + consumedOffsets = new HashMap(); + } + + @Override + public void subscribe(String... topics) { + if(subscribedPartitions.size() > 0) + throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); + for(String topic : topics) { + subscribedTopics.add(topic); + } + } + + @Override + public void subscribe(TopicPartition... partitions) { + if(subscribedTopics.size() > 0) + throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); + for(TopicPartition partition : partitions) { + subscribedPartitions.add(partition); + consumedOffsets.put(partition, 0L); + } + } + + public void unsubscribe(String... topics) { + // throw an exception if the topic was never subscribed to + for(String topic:topics) { + if(!subscribedTopics.contains(topic)) + throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" + + " to unsubscribe(" + topic + ")"); + subscribedTopics.remove(topic); + } + } + + public void unsubscribe(TopicPartition... partitions) { + // throw an exception if the partition was never subscribed to + for(TopicPartition partition:partitions) { + if(!subscribedPartitions.contains(partition)) + throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" + + partition.topic() + "," + partition.partition() + ") should be called prior" + + " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")"); + subscribedPartitions.remove(partition); + committedOffsets.remove(partition); + consumedOffsets.remove(partition); + } + } + + @Override + public Map poll(long timeout, TimeUnit timeUnit) { + // hand out one dummy record, 1 per topic + Map> records = new HashMap>(); + Map recordMetadata = new HashMap(); + for(TopicPartition partition : subscribedPartitions) { + // get the last consumed offset + long messageSequence = consumedOffsets.get(partition); + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + ObjectOutputStream outputStream; + try { + outputStream = new ObjectOutputStream(byteStream); + outputStream.writeLong(messageSequence++); + outputStream.close(); + } catch (IOException e) { + e.printStackTrace(); + } + List recordsForTopic = records.get(partition.topic()); + if(recordsForTopic == null) { + recordsForTopic = new ArrayList(); + records.put(partition.topic(), recordsForTopic); + } + recordsForTopic.add(new ConsumerRecord(partition.topic(), partition.partition(), null, byteStream.toByteArray(), messageSequence)); + consumedOffsets.put(partition, messageSequence); + } + for(Entry> recordsPerTopic : records.entrySet()) { + Map> recordsPerPartition = new HashMap>(); + for(ConsumerRecord record : recordsPerTopic.getValue()) { + List recordsForThisPartition = recordsPerPartition.get(record.partition()); + if(recordsForThisPartition == null) { + recordsForThisPartition = new ArrayList(); + recordsPerPartition.put(record.partition(), recordsForThisPartition); + } + recordsForThisPartition.add(record); + } + recordMetadata.put(recordsPerTopic.getKey(), new ConsumerRecords(recordsPerTopic.getKey(), recordsPerPartition)); + } + return recordMetadata; + } + + @Override + public Future commit(Map offsets) { + for(Entry partitionOffset : offsets.entrySet()) { + committedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue()); + } + return new FutureOffsetMetadata(offsets); + } + + @Override + public void seek(Map offsets) { + // change the fetch offsets + for(Entry partitionOffset : offsets.entrySet()) { + consumedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue()); + } + } + + @Override + public Map committed(TopicPartition... partitions) { + Map offsets = new HashMap(); + for(TopicPartition partition : partitions) { + offsets.put(new TopicPartition(partition.topic(), partition.partition()), committedOffsets.get(partition)); + } + return offsets; + } + + @Override + public Map position(TopicPartition... partitions) { + Map positions = new HashMap(); + for(TopicPartition partition : partitions) { + positions.put(partition, consumedOffsets.get(partition)); + } + return positions; + } + + @Override + public Map offsetsBeforeTime(long timestamp, + TopicPartition... partitions) { + throw new UnsupportedOperationException(); + } + + @Override + public Future commit() { + return commit(consumedOffsets); + } + + @Override + public Map metrics() { + return null; + } + + @Override + public void close() { + // unsubscribe from all partitions + TopicPartition[] allPartitions = new TopicPartition[subscribedPartitions.size()]; + unsubscribe(subscribedPartitions.toArray(allPartitions)); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java new file mode 100644 index 0000000..a94026a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.TopicPartition; + +/** + * The metadata for an offset commit that has been acknowledged by the server + */ +public final class OffsetMetadata { + + private final Map offsets; + private final Map errors; + + public OffsetMetadata(Map offsets, Map errors) { + super(); + this.offsets = offsets; + this.errors = errors; + } + + public OffsetMetadata(Map offsets) { + this(offsets, null); + } + + /** + * The offset of the record in the topic/partition. + */ + public long offset(TopicPartition partition) { + if(this.errors != null) + throw errors.get(partition); + return offsets.get(partition); + } + + /** + * @return The exception corresponding to the error code returned by the server + */ + public RuntimeException error(TopicPartition partition) { + if(errors != null) + return errors.get(partition); + else + return null; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FutureOffsetMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FutureOffsetMetadata.java new file mode 100644 index 0000000..06af43d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FutureOffsetMetadata.java @@ -0,0 +1,53 @@ +package org.apache.kafka.clients.consumer.internals; + +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +import org.apache.kafka.clients.consumer.OffsetMetadata; +import org.apache.kafka.common.TopicPartition; + +public final class FutureOffsetMetadata implements Future { + + private final Map offsets; + private final Map errors; + + public FutureOffsetMetadata(Map offsets, Map errors) { + this.offsets = offsets; + this.errors = errors; + } + + public FutureOffsetMetadata(Map offsets) { + this(offsets, null); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public OffsetMetadata get() throws InterruptedException, ExecutionException { + return new OffsetMetadata(offsets, errors); + } + + @Override + public OffsetMetadata get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return new OffsetMetadata(offsets, errors); + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index a6423f4..a7a5150 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -13,7 +13,6 @@ package org.apache.kafka.clients.producer; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -44,6 +43,7 @@ import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.ClientUtils; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; @@ -118,7 +118,7 @@ public class KafkaProducer implements Producer { config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, time); - List addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); this.sender = new Sender(new Selector(this.metrics, time), this.metadata, @@ -150,28 +150,6 @@ public class KafkaProducer implements Producer { } } - private static List parseAndValidateAddresses(List urls) { - List addresses = new ArrayList(); - for (String url : urls) { - if (url != null && url.length() > 0) { - String[] pieces = url.split(":"); - if (pieces.length != 2) - throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); - try { - InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1])); - if (address.isUnresolved()) - throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url); - addresses.add(address); - } catch (NumberFormatException e) { - throw new ConfigException("Invalid port in metadata.broker.list: " + url); - } - } - } - if (addresses.size() < 1) - throw new ConfigException("No bootstrap urls given in metadata.broker.list."); - return addresses; - } - /** * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)} */ diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java new file mode 100644 index 0000000..cb33e34 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.ConfigException; + +public class ClientUtils { + public static List parseAndValidateAddresses(List urls) { + List addresses = new ArrayList(); + for (String url : urls) { + if (url != null && url.length() > 0) { + String[] pieces = url.split(":"); + if (pieces.length != 2) + throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); + try { + InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1])); + if (address.isUnresolved()) + throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url); + addresses.add(address); + } catch (NumberFormatException e) { + throw new ConfigException("Invalid port in metadata.broker.list: " + url); + } + } + } + if (addresses.size() < 1) + throw new ConfigException("No bootstrap urls given in metadata.broker.list."); + return addresses; + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java new file mode 100644 index 0000000..57269da --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. +*/ +package org.apache.kafka.clients.consumer; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import kafka.common.TopicPartitionOffset; + +import org.apache.kafka.common.TopicPartition; +import org.junit.Test; + +/** + * TODO: Clean this after the consumer implementation is complete. Until then, it is useful to write some sample test code using the new APIs + * + */ +public class ConsumerExampleTest { + /** + * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load + * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are automatically committed periodically, + * as controlled by the auto.commit.interval.ms config + */ + @Test + public void testConsumerGroupManagementWithAutoOffsetCommits() { + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + props.put("group.id", "test"); + props.put("session.timeout.ms", "1000"); + props.put("auto.commit.enable", "true"); + props.put("auto.commit.interval.ms", "10000"); + KafkaConsumer consumer = new KafkaConsumer(props); + // subscribe to some topics + consumer.subscribe("foo", "bar"); + boolean isRunning = true; + while(isRunning) { + Map records = consumer.poll(100, TimeUnit.MILLISECONDS); + process(records); + } + consumer.close(); + } + + /** + * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load + * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are manually committed using + * either the commit() or commitAsync() APIs. This example also demonstrates rewinding the consumer's offsets if processing of consumed + * messages fails. + */ + @Test + public void testConsumerGroupManagementWithManualOffsetCommit() { + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + props.put("group.id", "test"); + props.put("session.timeout.ms", "1000"); + props.put("auto.commit.enable", "false"); + KafkaConsumer consumer = new KafkaConsumer(props); + // subscribe to some topics + consumer.subscribe("foo", "bar"); + int commitInterval = 100; + int numRecords = 0; + boolean isRunning = true; + Map consumedOffsets = new HashMap(); + while(isRunning) { + Map records = consumer.poll(100, TimeUnit.MILLISECONDS); + try { + Map lastConsumedOffsets = process(records); + consumedOffsets.putAll(lastConsumedOffsets); + numRecords += records.size(); + // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance + if(numRecords % commitInterval == 0) + consumer.commit(); + } catch(Exception e) { + // rewind consumer's offsets for failed partitions + List failedPartitions = getFailedPartitions(); + Map offsetsToRewindTo = new HashMap(); + for(TopicPartition failedPartition : failedPartitions) { + // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset + // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to. + offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition)); + } + // seek to new offsets only for partitions that failed the last process() + consumer.seek(offsetsToRewindTo); + } + } + consumer.close(); + } + + private List getFailedPartitions() { return null; } + + /** + * This example demonstrates the consumer can be used to leverage Kafka's group management functionality along with custom offset storage. + * In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to + * plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback + * callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance and + * before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer. + */ + @Test + public void testConsumerRebalanceWithCustomOffsetStore() { + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + props.put("group.id", "test"); + props.put("session.timeout.ms", "1000"); + props.put("auto.commit.enable", "true"); + props.put("auto.commit.interval.ms", "10000"); + KafkaConsumer consumer = new KafkaConsumer(props, + new ConsumerRebalanceCallback() { + public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) { + Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions); + consumer.seek(lastCommittedOffsets); + } + public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) { + TopicPartitionOffset[] offsets = getLastConsumedOffsets(partitions); // implemented by the user + commitOffsetsToCustomStore(offsets); // implemented by the user + } + private Map getLastCommittedOffsetsFromCustomStore(TopicPartition... partitions) { + return null; + } + private TopicPartitionOffset[] getLastConsumedOffsets(TopicPartition... partitions) { return null; } + private void commitOffsetsToCustomStore(TopicPartitionOffset... offsets) {} + }); + // subscribe to topics + consumer.subscribe("foo", "bar"); + int commitInterval = 100; + int numRecords = 0; + boolean isRunning = true; + while(isRunning) { + Map records = consumer.poll(100, TimeUnit.MILLISECONDS); + Map consumedOffsets = process(records); + numRecords += records.size(); + // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance + if(numRecords % commitInterval == 0) + commitOffsetsToCustomStore(consumedOffsets); + } + consumer.close(); + } + + /** + * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with Kafka based offset storage. + * In this example, the assumption made is that the user chooses to use Kafka based offset management. + */ + @Test + public void testConsumerRewindWithGroupManagementAndKafkaOffsetStorage() { + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + props.put("group.id", "test"); + props.put("session.timeout.ms", "1000"); + props.put("auto.commit.enable", "false"); + KafkaConsumer consumer = new KafkaConsumer(props, + new ConsumerRebalanceCallback() { + boolean rewindOffsets = true; + public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) { + if(rewindOffsets) { + Map latestCommittedOffsets = consumer.committed(partitions); + Map newOffsets = rewindOffsets(latestCommittedOffsets, 100); + consumer.seek(newOffsets); + } + } + public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) { + consumer.commit(); + } + // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages + private Map rewindOffsets(Map currentOffsets, + long numberOfMessagesToRewindBackTo) { + Map newOffsets = new HashMap(); + for(Map.Entry offset : currentOffsets.entrySet()) { + newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo); + } + return newOffsets; + } + }); + // subscribe to topics + consumer.subscribe("foo", "bar"); + int commitInterval = 100; + int numRecords = 0; + boolean isRunning = true; + while(isRunning) { + Map records = consumer.poll(100, TimeUnit.MILLISECONDS); + Map consumedOffsets = process(records); + numRecords += records.size(); + // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance + if(numRecords % commitInterval == 0) + commitOffsetsToCustomStore(consumedOffsets); + } + consumer.close(); + } + + /** + * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest + * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes + * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions. + * This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka + * based offset management. However, session.timeout.ms is not required since the Kafka consumer only does failure detection with group + * management. + */ + @Test + public void testConsumerWithKafkaBasedOffsetManagement() { + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + props.put("group.id", "test"); + props.put("auto.commit.enable", "true"); + props.put("auto.commit.interval.ms", "10000"); + KafkaConsumer consumer = new KafkaConsumer(props); + // subscribe to some partitions of topic foo + TopicPartition partition0 = new TopicPartition("foo", 0); + TopicPartition partition1 = new TopicPartition("foo", 1); + TopicPartition[] partitions = new TopicPartition[2]; + partitions[0] = partition0; + partitions[1] = partition1; + consumer.subscribe(partitions); + // find the last committed offsets for partitions 0,1 of topic foo + Map lastCommittedOffsets = consumer.committed(partition0, partition1); + // seek to the last committed offsets to avoid duplicates + consumer.seek(lastCommittedOffsets); + // find the offsets of the latest available messages to know where to stop consumption + Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1); + boolean isRunning = true; + while(isRunning) { + Map records = consumer.poll(100, TimeUnit.MILLISECONDS); + Map consumedOffsets = process(records); + for(TopicPartition partition : partitions) { + if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition)) + isRunning = false; + else + isRunning = true; + } + } + consumer.close(); + } + + /** + * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest + * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes + * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions. + * This example assumes that the user chooses to use custom offset storage. + */ + @Test + public void testConsumerWithCustomOffsetManagement() { + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + KafkaConsumer consumer = new KafkaConsumer(props); + // subscribe to some partitions of topic foo + TopicPartition partition0 = new TopicPartition("foo", 0); + TopicPartition partition1 = new TopicPartition("foo", 1); + TopicPartition[] partitions = new TopicPartition[2]; + partitions[0] = partition0; + partitions[1] = partition1; + consumer.subscribe(partitions); + Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(); + // seek to the last committed offsets to avoid duplicates + consumer.seek(lastCommittedOffsets); + // find the offsets of the latest available messages to know where to stop consumption + Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1); + boolean isRunning = true; + while(isRunning) { + Map records = consumer.poll(100, TimeUnit.MILLISECONDS); + Map consumedOffsets = process(records); + // commit offsets for partitions 0,1 for topic foo to custom store + commitOffsetsToCustomStore(consumedOffsets); + for(TopicPartition partition : partitions) { + if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition)) + isRunning = false; + else + isRunning = true; + } + } + consumer.close(); + } + + private Map getLastCommittedOffsetsFromCustomStore() { return null; } + private void commitOffsetsToCustomStore(Map consumedOffsets) {} + private Map process(Map records) { + Map processedOffsets = new HashMap(); + for(Entry recordMetadata : records.entrySet()) { + List recordsPerTopic = recordMetadata.getValue().records(); + for(int i = 0;i < recordsPerTopic.size();i++) { + ConsumerRecord record = recordsPerTopic.get(i); + // process record + processedOffsets.put(record.partition(), record.offset()); + } + } + return processedOffsets; + } +}