diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java deleted file mode 100644 index 227f564..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. -*/ -package org.apache.kafka.clients.consumer; - -import java.io.Closeable; -import java.util.Collection; -import java.util.Map; - -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.TopicPartition; - -/** - * @see KafkaConsumer - * @see MockConsumer - */ -public interface Consumer extends Closeable { - - /** - * Incrementally subscribe to the given list of topics. This API is mutually exclusive to - * {@link #subscribe(TopicPartition...) subscribe(partitions)} - * @param topics A variable list of topics that the consumer subscribes to - */ - public void subscribe(String...topics); - - /** - * Incrementally subscribes to a specific topic and partition. This API is mutually exclusive to - * {@link #subscribe(String...) subscribe(topics)} - * @param partitions Partitions to subscribe to - */ - public void subscribe(TopicPartition... partitions); - - /** - * Unsubscribe from the specific topics. Messages for this topic will not be returned from the next {@link #poll(long) poll()} - * onwards. This should be used in conjunction with {@link #subscribe(String...) subscribe(topics)}. It is an error to - * unsubscribe from a topic that was never subscribed to using {@link #subscribe(String...) subscribe(topics)} - * @param topics Topics to unsubscribe from - */ - public void unsubscribe(String... topics); - - /** - * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next - * {@link #poll(long) poll()} onwards. This should be used in conjunction with - * {@link #subscribe(TopicPartition...) subscribe(topic, partitions)}. It is an error to - * unsubscribe from a partition that was never subscribed to using {@link #subscribe(TopicPartition...) subscribe(partitions)} - * @param partitions Partitions to unsubscribe from - */ - public void unsubscribe(TopicPartition... partitions); - - /** - * Fetches data for the subscribed list of topics and partitions - * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. Must not be negative - * @return Map of topic to records for the subscribed topics and partitions as soon as data is available for a topic partition. Availability - * of data is controlled by {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} and {@link ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG}. - * If no data is available for timeout ms, returns an empty list - */ - public Map poll(long timeout); - - /** - * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions. - * @param sync If true, the commit should block until the consumer receives an acknowledgment - * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null - * if the sync flag is set to false - */ - public OffsetMetadata commit(boolean sync); - - /** - * Commits the specified offsets for the specified list of topics and partitions to Kafka. - * @param offsets The map of offsets to commit for the given topic partitions - * @param sync If true, commit will block until the consumer receives an acknowledgment - * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null - * if the sync flag is set to false. - */ - public OffsetMetadata commit(Map offsets, boolean sync); - - /** - * Overrides the fetch positions that the consumer will use on the next fetch request. If the consumer subscribes to a list of topics - * using {@link #subscribe(String...) subscribe(topics)}, an exception will be thrown if the specified topic partition is not owned by - * the consumer. - * @param offsets The map of fetch positions per topic and partition - */ - public void seek(Map offsets); - - /** - * Returns the fetch position of the next message for the specified topic partition to be used on the next {@link #poll(long) poll()} - * @param partitions Partitions for which the fetch position will be returned - * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long) poll()} - */ - public Map position(Collection partitions); - - /** - * Fetches the last committed offsets for the input list of partitions - * @param partitions The list of partitions to return the last committed offset for - * @return The list of offsets for the specified list of partitions - */ - public Map committed(Collection partitions); - - /** - * Fetches offsets before a certain timestamp - * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp. - * @param partitions The list of partitions for which the offsets are returned - * @return The offsets for messages that were written to the server before the specified timestamp. - */ - public Map offsetsBeforeTime(long timestamp, Collection partitions); - - /** - * Return a map of metrics maintained by the consumer - */ - public Map metrics(); - - /** - * Close this consumer - */ - public void close(); - -} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java deleted file mode 100644 index 46efc0c..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. -*/ -package org.apache.kafka.clients.consumer; - -import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; - -import java.util.Map; - -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Importance; -import org.apache.kafka.common.config.ConfigDef.Type; - -/** - * The consumer configuration keys - */ -public class ConsumerConfig extends AbstractConfig { - private static final ConfigDef config; - - /** - * The identifier of the group this consumer belongs to. This is required if the consumer uses either the - * group management functionality by using {@link Consumer#subscribe(String...) subscribe(topics)}. This is also required - * if the consumer uses the default Kafka based offset management strategy. - */ - public static final String GROUP_ID_CONFIG = "group.id"; - - /** - * The timeout after which, if the {@link Consumer#poll(long) poll(timeout)} is not invoked, the consumer is - * marked dead and a rebalance operation is triggered for the group identified by {@link #GROUP_ID_CONFIG}. Relevant - * if the consumer uses the group management functionality by invoking {@link Consumer#subscribe(String...) subscribe(topics)} - */ - public static final String SESSION_TIMEOUT_MS = "session.timeout.ms"; - - /** - * The number of times a consumer sends a heartbeat to the co-ordinator broker within a {@link #SESSION_TIMEOUT_MS} time window. - * This frequency affects the latency of a rebalance operation since the co-ordinator broker notifies a consumer of a rebalance - * in the heartbeat response. Relevant if the consumer uses the group management functionality by invoking - * {@link Consumer#subscribe(String...) subscribe(topics)} - */ - public static final String HEARTBEAT_FREQUENCY = "heartbeat.frequency"; - - /** - * A list of URLs to use for establishing the initial connection to the cluster. This list should be in the form - * host1:port1,host2:port2,.... These urls are just used for the initial connection to discover the - * full cluster membership (which may change dynamically) so this list need not contain the full set of servers (you - * may want more than one, though, in case a server is down). - */ - public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; - - /** - * If true, periodically commit to Kafka the offsets of messages already returned by the consumer. This committed - * offset will be used when the process fails as the position from which the consumption will begin. - */ - public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"; - - /** - * The friendly name of the partition assignment strategy that the server will use to distribute partition ownership - * amongst consumer instances when group management is used - */ - public static final String PARTITION_ASSIGNMENT_STRATEGY = "partition.assignment.strategy"; - - /** - * The frequency in milliseconds that the consumer offsets are committed to Kafka. Relevant if {@link #ENABLE_AUTO_COMMIT_CONFIG} - * is turned on. - */ - public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms"; - - /** - * What to do when there is no initial offset in Kafka or if an offset is out of range: - *
    - *
  • smallest: automatically reset the offset to the smallest offset - *
  • largest: automatically reset the offset to the largest offset - *
  • disable: throw exception to the consumer if no previous offset is found for the consumer's group - *
  • anything else: throw exception to the consumer. - *
- */ - public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset"; - - /** - * The minimum amount of data the server should return for a fetch request. If insufficient data is available the - * request will wait for that much data to accumulate before answering the request. - */ - public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes"; - - /** - * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient - * data to immediately satisfy {@link #FETCH_MIN_BYTES_CONFIG}. This should be less than or equal to the timeout used in - * {@link KafkaConsumer#poll(long) poll(timeout)} - */ - public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms"; - - /** - * The maximum amount of time to block waiting to fetch metadata about a topic the first time a record is received - * from that topic. The consumer will throw a TimeoutException if it could not successfully fetch metadata within - * this timeout. - */ - public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; - - /** - * The total memory used by the consumer to buffer records received from the server. This config is meant to control - * the consumer's memory usage, so it is the size of the global fetch buffer that will be shared across all partitions. - */ - public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes"; - - /** - * The minimum amount of memory that should be used to fetch at least one message for a partition. This puts a lower - * bound on the consumer's memory utilization when there is at least one message for a partition available on the server. - * This size must be at least as large as the maximum message size the server allows or else it is possible for the producer - * to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large - * message on a certain partition. - */ - public static final String FETCH_BUFFER_CONFIG = "fetch.buffer.bytes"; - - /** - * The id string to pass to the server when making requests. The purpose of this is to be able to track the source - * of requests beyond just ip/port by allowing a logical application name to be included. - */ - public static final String CLIENT_ID_CONFIG = "client.id"; - - /** - * The size of the TCP send buffer to use when fetching data - */ - public static final String SOCKET_RECEIVE_BUFFER_CONFIG = "socket.receive.buffer.bytes"; - - /** - * The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a - * host in a tight loop. This backoff applies to all requests sent by the consumer to the broker. - */ - public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms"; - - /** metrics.sample.window.ms */ - public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms"; - private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. " - + "When a window expires we erase and overwrite the oldest window."; - - /** metrics.num.samples */ - public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples"; - private static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics."; - - /** metric.reporters */ - public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; - private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; - - static { - /* TODO: add config docs */ - config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, "blah blah") - .define(GROUP_ID_CONFIG, Type.STRING, Importance.HIGH, "blah blah") - .define(SESSION_TIMEOUT_MS, Type.LONG, 1000, Importance.HIGH, "blah blah") - .define(HEARTBEAT_FREQUENCY, Type.INT, 3, Importance.MEDIUM, "blah blah") - .define(PARTITION_ASSIGNMENT_STRATEGY, Type.STRING, Importance.MEDIUM, "blah blah") - .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), Importance.MEDIUM, "blah blah") - .define(ENABLE_AUTO_COMMIT_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, "blah blah") - .define(AUTO_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, 5000, atLeast(0), Importance.LOW, "blah blah") - .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.LOW, "blah blah") - .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.LOW, "blah blah") - .define(FETCH_BUFFER_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), Importance.HIGH, "blah blah") - .define(SOCKET_RECEIVE_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.LOW, "blah blah") - .define(FETCH_MIN_BYTES_CONFIG, Type.LONG, 1024, atLeast(0), Importance.HIGH, "blah blah") - .define(FETCH_MAX_WAIT_MS_CONFIG, Type.LONG, 500, atLeast(0), Importance.LOW, "blah blah") - .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, "blah blah") - .define(AUTO_OFFSET_RESET_CONFIG, Type.STRING, "largest", Importance.MEDIUM, "blah blah") - .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, - Type.LONG, - 30000, - atLeast(0), - Importance.LOW, - METRICS_SAMPLE_WINDOW_MS_DOC) - .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC) - .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC); - - } - - ConsumerConfig(Map props) { - super(config, props); - } - -} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java deleted file mode 100644 index 05eb6ce..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. -*/ -package org.apache.kafka.clients.consumer; - -import java.util.Collection; - -import org.apache.kafka.common.TopicPartition; - -/** - * A callback interface that the user can implement to manage customized offsets on the start and end of - * every rebalance operation. This callback will execute in the user thread as part of the - * {@link Consumer#poll(long) poll(long)} API on every rebalance attempt. - * Default implementation of the callback will {@link Consumer#seek(java.util.Map) seek(offsets)} to the last committed offsets in the - * {@link #onPartitionsAssigned(Consumer, TopicPartition...) onPartitionsAssigned()} callback. And will commit offsets synchronously - * for the specified list of partitions to Kafka in the {@link #onPartitionsRevoked(Consumer, TopicPartition...) onPartitionsRevoked()} - * callback. - */ -public interface ConsumerRebalanceCallback { - - /** - * A callback method the user can implement to provide handling of customized offsets on completion of a successful - * rebalance operation. This method will be called after a rebalance operation completes and before the consumer - * starts fetching data. - *

- * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} - * @param partitions The list of partitions that are assigned to the consumer after rebalance - */ - public void onPartitionsAssigned(Consumer consumer, Collection partitions); - - /** - * A callback method the user can implement to provide handling of offset commits to a customized store on the - * start of a rebalance operation. This method will be called before a rebalance operation starts and after the - * consumer stops fetching data. It is recommended that offsets should be committed in this callback to - * either Kafka or a custom offset store to prevent duplicate data - *

- * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} - * @param partitions The list of partitions that were assigned to the consumer on the last rebalance - */ - public void onPartitionsRevoked(Consumer consumer, Collection partitions); -} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java deleted file mode 100644 index 436d8a4..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. -*/ -package org.apache.kafka.clients.consumer; - -import org.apache.kafka.common.TopicPartition; - -/** - * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the - * record is being received and an offset that points to the record in a Kafka partition. - */ -public final class ConsumerRecord { - private final TopicPartition partition; - private final byte[] key; - private final byte[] value; - private final long offset; - private volatile Exception error; - - /** - * Creates a record to be received from a specified topic and partition - * - * @param topic The topic this record is received from - * @param partitionId The partition of the topic this record is received from - * @param key The key of the record, if one exists - * @param value The record contents - * @param offset The offset of this record in the corresponding Kafka partition - */ - public ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset) { - this(topic, partitionId, key, value, offset, null); - } - - /** - * Create a record with no key - * - * @param topic The topic this record is received from - * @param partitionId The partition of the topic this record is received from - * @param value The record contents - * @param offset The offset of this record in the corresponding Kafka partition - */ - public ConsumerRecord(String topic, int partitionId, byte[] value, long offset) { - this(topic, partitionId, null, value, offset); - } - - /** - * Creates a record with an error code - * @param topic The topic this record is received from - * @param partitionId The partition of the topic this record is received from - * @param error The exception corresponding to the error code returned by the server for this topic partition - */ - public ConsumerRecord(String topic, int partitionId, Exception error) { - this(topic, partitionId, null, null, -1L, error); - } - - private ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset, Exception error) { - if (topic == null) - throw new IllegalArgumentException("Topic cannot be null"); - this.partition = new TopicPartition(topic, partitionId); - this.key = key; - this.value = value; - this.offset = offset; - this.error = error; - } - - /** - * The topic this record is received from - */ - public String topic() { - return partition.topic(); - } - - /** - * The partition from which this record is received - */ - public int partition() { - return partition.partition(); - } - - /** - * The TopicPartition object containing the topic and partition - */ - public TopicPartition topicAndPartition() { - return partition; - } - - /** - * The key (or null if no key is specified) - * @throws Exception The exception thrown while fetching this record. - */ - public byte[] key() throws Exception { - if (this.error != null) - throw this.error; - return key; - } - - /** - * The value - * @throws Exception The exception thrown while fetching this record. - */ - public byte[] value() throws Exception { - if (this.error != null) - throw this.error; - return value; - } - - /** - * The position of this record in the corresponding Kafka partition. - * @throws Exception The exception thrown while fetching this record. - */ - public long offset() throws Exception { - if (this.error != null) - throw this.error; - return offset; - } - - public Exception error() { - return this.error; - } -} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java deleted file mode 100644 index 2ecfc8a..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. -*/ -package org.apache.kafka.clients.consumer; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -/** - * A container that holds the list {@link ConsumerRecord} per partition for a particular topic. There is one for every topic returned by a - * {@link Consumer#poll(long)} operation. - */ -public class ConsumerRecords { - - private final String topic; - private final Map> recordsPerPartition; - - public ConsumerRecords(String topic, Map> records) { - this.topic = topic; - this.recordsPerPartition = records; - } - - /** - * @param partitions The input list of partitions for a particular topic. If no partitions are - * specified, returns records for all partitions - * @return The list of {@link ConsumerRecord}s associated with the given partitions. - */ - public List records(int... partitions) { - List recordsToReturn = new ArrayList(); - if(partitions.length == 0) { - // return records for all partitions - for(Entry> record : recordsPerPartition.entrySet()) { - recordsToReturn.addAll(record.getValue()); - } - } else { - for(int partition : partitions) { - List recordsForThisPartition = recordsPerPartition.get(partition); - recordsToReturn.addAll(recordsForThisPartition); - } - } - return recordsToReturn; - } - - /** - * @return The topic of all records associated with this instance - */ - public String topic() { - return this.topic; - } -} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java deleted file mode 100644 index 18bcc90..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ /dev/null @@ -1,575 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. -*/ -package org.apache.kafka.clients.consumer; - -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.Map.Entry; -import java.util.concurrent.Future; - -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.metrics.JmxReporter; -import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.MetricsReporter; -import org.apache.kafka.common.utils.ClientUtils; -import org.apache.kafka.common.utils.SystemTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A Kafka client that consumes records from a Kafka cluster. - *

- * The consumer is thread safe and should generally be shared among all threads for best performance. - *

- * The consumer is single threaded and multiplexes I/O over TCP connections to each of the brokers it - * needs to communicate with. Failure to close the consumer after use will leak these resources. - *

Usage Examples

- * The consumer APIs offer flexibility to cover a variety of consumption use cases. Following are some examples to demonstrate the correct use of - * the available APIs. Each of the examples assumes the presence of a user implemented process() method that processes a given batch of messages - * and returns the offset of the latest processed message per partition. Note that process() is not part of the consumer API and is only used as - * a convenience method to demonstrate the different use cases of the consumer APIs. Here is a sample implementation of such a process() method. - *
- * {@code
- * private Map process(Map records) {
- *     Map processedOffsets = new HashMap();
- *     for(Entry recordMetadata : records.entrySet()) {
- *          List recordsPerTopic = recordMetadata.getValue().records();
- *          for(int i = 0;i < recordsPerTopic.size();i++) {
- *               ConsumerRecord record = recordsPerTopic.get(i);
- *               // process record
- *               processedOffsets.put(record.partition(), record.offset());                
- *          }
- *     }
- *     return processedOffsets; 
- * }
- * }
- * 
- *

- * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load - * balancing and failover. This example assumes that the offsets are stored in Kafka and are automatically committed periodically, - * as controlled by the auto.commit.interval.ms config - *

- * {@code  
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("session.timeout.ms", "1000");
- * props.put("enable.auto.commit", "true");
- * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer consumer = new KafkaConsumer(props);
- * consumer.subscribe("foo", "bar");
- * boolean isRunning = true;
- * while(isRunning) {
- *   Map records = consumer.poll(100, TimeUnit.MILLISECONDS);
- *   process(records);
- * }
- * consumer.close();
- * }
- * 
- * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load - * balancing and failover. This example assumes that the offsets are stored in Kafka and are manually committed using - * the commit() API. This example also demonstrates rewinding the consumer's offsets if processing of the consumed - * messages fails. Note that this method of rewinding offsets using {@link #seek(Map) seek(offsets)} is only useful for rewinding the offsets - * of the current consumer instance. As such, this will not trigger a rebalance or affect the fetch offsets for the other consumer instances. - *
- * {@code  
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("session.timeout.ms", "1000");
- * props.put("enable.auto.commit", "false");
- * KafkaConsumer consumer = new KafkaConsumer(props);
- * consumer.subscribe("foo", "bar");
- * int commitInterval = 100;
- * int numRecords = 0;
- * boolean isRunning = true;
- * Map consumedOffsets = new HashMap();
- * while(isRunning) {
- *     Map records = consumer.poll(100, TimeUnit.MILLISECONDS);
- *     try {
- *         Map lastConsumedOffsets = process(records);
- *         consumedOffsets.putAll(lastConsumedOffsets);
- *         numRecords += records.size();
- *         // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
- *         if(numRecords % commitInterval == 0) 
- *           consumer.commit();
- *     } catch(Exception e) {
- *         try {
- *             // rewind consumer's offsets for failed partitions
- *             // assume failedPartitions() returns the list of partitions for which the processing of the last batch of messages failed
- *             List failedPartitions = failedPartitions();   
- *             Map offsetsToRewindTo = new HashMap();
- *             for(TopicPartition failedPartition : failedPartitions) {
- *                 // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset
- *                 // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to.
- *                 offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
- *             }
- *             // seek to new offsets only for partitions that failed the last process()
- *             consumer.seek(offsetsToRewindTo);
- *         } catch(Exception e) {  break; } // rewind failed
- *     }
- * }         
- * consumer.close();
- * }
- * 
- *

- * This example demonstrates how to rewind the offsets of the entire consumer group. It is assumed that the user has chosen to use Kafka's - * group management functionality for automatic consumer load balancing and failover. This example also assumes that the offsets are stored in - * Kafka. If group management is used, the right place to systematically rewind offsets for every consumer instance is inside the - * ConsumerRebalanceCallback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance - * and before the consumption restarts post rebalance. This is the right place to supply the newly rewound offsets to the consumer. It - * is recommended that if you foresee the requirement to ever reset the consumer's offsets in the presence of group management, that you - * always configure the consumer to use the ConsumerRebalanceCallback with a flag that protects whether or not the offset rewind logic is used. - * This method of rewinding offsets is useful if you notice an issue with your message processing after successful consumption and offset commit. - * And you would like to rewind the offsets for the entire consumer group as part of rolling out a fix to your processing logic. In this case, - * you would configure each of your consumer instances with the offset rewind configuration flag turned on and bounce each consumer instance - * in a rolling restart fashion. Each restart will trigger a rebalance and eventually all consumer instances would have rewound the offsets for - * the partitions they own, effectively rewinding the offsets for the entire consumer group. - *

- * {@code  
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("session.timeout.ms", "1000");
- * props.put("enable.auto.commit", "false");
- * KafkaConsumer consumer = new KafkaConsumer(props,
- *                                            new ConsumerRebalanceCallback() {
- *                                                boolean rewindOffsets = true;  // should be retrieved from external application config
- *                                                public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) {
- *                                                    Map latestCommittedOffsets = consumer.committed(partitions);
- *                                                    if(rewindOffsets)
- *                                                        Map newOffsets = rewindOffsets(latestCommittedOffsets, 100);
- *                                                    consumer.seek(newOffsets);
- *                                                }
- *                                                public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) {
- *                                                    consumer.commit();
- *                                                }
- *                                                // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages 
- *                                                private Map rewindOffsets(Map currentOffsets,
- *                                                                                                long numberOfMessagesToRewindBackTo) {
- *                                                    Map newOffsets = new HashMap();
- *                                                    for(Map.Entry offset : currentOffsets.entrySet()) 
- *                                                        newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo);
- *                                                    return newOffsets;
- *                                                }
- *                                            });
- * consumer.subscribe("foo", "bar");
- * int commitInterval = 100;
- * int numRecords = 0;
- * boolean isRunning = true;
- * Map consumedOffsets = new HashMap();
- * while(isRunning) {
- *     Map records = consumer.poll(100, TimeUnit.MILLISECONDS);
- *     Map lastConsumedOffsets = process(records);
- *     consumedOffsets.putAll(lastConsumedOffsets);
- *     numRecords += records.size();
- *     // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
- *     if(numRecords % commitInterval == 0) 
- *         consumer.commit(consumedOffsets);
- * }
- * consumer.close();
- * }
- * 
- * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with custom offset storage. - * In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to - * plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback - * callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance and - * before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer. - *

- * Similarly, the user would also be required to plugin logic for storing the consumer's offsets to a custom store. The onPartitionsRevoked - * callback is invoked right after the consumer has stopped fetching data and before the partition ownership changes. This is the right place - * to commit the offsets for the current set of partitions owned by the consumer. - *

- * {@code  
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("session.timeout.ms", "1000");
- * props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage
- * KafkaConsumer consumer = new KafkaConsumer(props,
- *                                            new ConsumerRebalanceCallback() {
- *                                                public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) {
- *                                                    Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
- *                                                    consumer.seek(lastCommittedOffsets);
- *                                                }
- *                                                public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) {
- *                                                    Map offsets = getLastConsumedOffsets(partitions);
- *                                                    commitOffsetsToCustomStore(offsets); 
- *                                                }
- *                                                // following APIs should be implemented by the user for custom offset management
- *                                                private Map getLastCommittedOffsetsFromCustomStore(TopicPartition... partitions) {
- *                                                    return null;
- *                                                }
- *                                                private Map getLastConsumedOffsets(TopicPartition... partitions) { return null; }
- *                                                private void commitOffsetsToCustomStore(Map offsets) {}
- *                                            });
- * consumer.subscribe("foo", "bar");
- * int commitInterval = 100;
- * int numRecords = 0;
- * boolean isRunning = true;
- * Map consumedOffsets = new HashMap();
- * while(isRunning) {
- *     Map records = consumer.poll(100, TimeUnit.MILLISECONDS);
- *     Map lastConsumedOffsets = process(records);
- *     consumedOffsets.putAll(lastConsumedOffsets);
- *     numRecords += records.size();
- *     // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
- *     if(numRecords % commitInterval == 0) 
- *         commitOffsetsToCustomStore(consumedOffsets);
- * }
- * consumer.close();
- * }
- * 
- * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest - * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes - * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions. - * This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka - * based offset management. However, session.timeout.ms is not required since the Kafka consumer only does automatic failover when group - * management is used. - *
- * {@code  
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("enable.auto.commit", "true");
- * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer consumer = new KafkaConsumer(props);
- * // subscribe to some partitions of topic foo
- * TopicPartition partition0 = new TopicPartition("foo", 0);
- * TopicPartition partition1 = new TopicPartition("foo", 1);
- * TopicPartition[] partitions = new TopicPartition[2];
- * partitions[0] = partition0;
- * partitions[1] = partition1;
- * consumer.subscribe(partitions);
- * // find the last committed offsets for partitions 0,1 of topic foo
- * Map lastCommittedOffsets = consumer.committed(partition0, partition1);
- * // seek to the last committed offsets to avoid duplicates
- * consumer.seek(lastCommittedOffsets);        
- * // find the offsets of the latest available messages to know where to stop consumption
- * Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1);
- * boolean isRunning = true;
- * Map consumedOffsets = new HashMap();
- * while(isRunning) {
- *     Map records = consumer.poll(100, TimeUnit.MILLISECONDS);
- *     Map lastConsumedOffsets = process(records);
- *     consumedOffsets.putAll(lastConsumedOffsets);
- *     for(TopicPartition partition : partitions) {
- *         if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
- *             isRunning = false;
- *         else
- *             isRunning = true;
- *     }
- * }
- * consumer.commit();
- * consumer.close();
- * }
- * 
- * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest - * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes - * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions. - * This example assumes that the user chooses to use custom offset storage. - *
- * {@code  
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * KafkaConsumer consumer = new KafkaConsumer(props);
- * // subscribe to some partitions of topic foo
- * TopicPartition partition0 = new TopicPartition("foo", 0);
- * TopicPartition partition1 = new TopicPartition("foo", 1);
- * TopicPartition[] partitions = new TopicPartition[2];
- * partitions[0] = partition0;
- * partitions[1] = partition1;
- * consumer.subscribe(partitions);
- * Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
- * // seek to the last committed offsets to avoid duplicates
- * consumer.seek(lastCommittedOffsets);        
- * // find the offsets of the latest available messages to know where to stop consumption
- * Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1);
- * boolean isRunning = true;
- * Map consumedOffsets = new HashMap();
- * while(isRunning) {
- *     Map records = consumer.poll(100, TimeUnit.MILLISECONDS);
- *     Map lastConsumedOffsets = process(records);
- *     consumedOffsets.putAll(lastConsumedOffsets);
- *     // commit offsets for partitions 0,1 for topic foo to custom store
- *     commitOffsetsToCustomStore(consumedOffsets);
- *     for(TopicPartition partition : partitions) {
- *         if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
- *             isRunning = false;
- *         else
- *             isRunning = true;
- *     }            
- * }      
- * commitOffsetsToCustomStore(consumedOffsets);   
- * consumer.close();
- * }
- * 
- */ -public class KafkaConsumer implements Consumer { - - private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class); - - private final long metadataFetchTimeoutMs; - private final long totalMemorySize; - private final Metrics metrics; - private final Set subscribedTopics; - private final Set subscribedPartitions; - - /** - * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings - * are documented here. Values can be - * either strings or Objects of the appropriate type (for example a numeric configuration would accept either the - * string "42" or the integer 42). - *

- * Valid configuration strings are documented at {@link ConsumerConfig} - * @param configs The consumer configs - */ - public KafkaConsumer(Map configs) { - this(new ConsumerConfig(configs), null); - } - - /** - * A consumer is instantiated by providing a set of key-value pairs as configuration and a {@link ConsumerRebalanceCallback} - * implementation - *

- * Valid configuration strings are documented at {@link ConsumerConfig} - * @param configs The consumer configs - * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of - * every rebalance operation. - */ - public KafkaConsumer(Map configs, ConsumerRebalanceCallback callback) { - this(new ConsumerConfig(configs), callback); - } - - /** - * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration. - * Valid configuration strings are documented at {@link ConsumerConfig} - */ - public KafkaConsumer(Properties properties) { - this(new ConsumerConfig(properties), null); - } - - /** - * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a - * {@link ConsumerRebalanceCallback} implementation. - *

- * Valid configuration strings are documented at {@link ConsumerConfig} - * @param properties The consumer configuration properties - * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of - * every rebalance operation. - */ - public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) { - this(new ConsumerConfig(properties), callback); - } - - private KafkaConsumer(ConsumerConfig config) { - this(config, null); - } - - private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback) { - log.trace("Starting the Kafka consumer"); - subscribedTopics = new HashSet(); - subscribedPartitions = new HashSet(); - this.metrics = new Metrics(new MetricConfig(), - Collections.singletonList((MetricsReporter) new JmxReporter("kafka.consumer.")), - new SystemTime()); - this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG); - this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG); - List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - config.logUnused(); - log.debug("Kafka consumer started"); - } - - /** - * Incrementally subscribes to the given list of topics and uses the consumer's group management functionality - *

- * As part of group management, the consumer will keep track of the list of consumers that belong to a particular group and - * will trigger a rebalance operation if one of the following events trigger - - *

    - *
  • Number of partitions change for any of the subscribed list of topics - *
  • Topic is created or deleted - *
  • An existing member of the consumer group dies - *
  • A new member is added to an existing consumer group via the join API - *
- * @param topics A variable list of topics that the consumer wants to subscribe to - */ - @Override - public void subscribe(String... topics) { - if(subscribedPartitions.size() > 0) - throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); - for(String topic:topics) - subscribedTopics.add(topic); - // TODO: trigger a rebalance operation - } - - /** - * Incrementally subscribes to a specific topic partition and does not use the consumer's group management functionality. As such, - * there will be no rebalance operation triggered when group membership or cluster and topic metadata change. - *

- * @param partitions Partitions to incrementally subscribe to - */ - @Override - public void subscribe(TopicPartition... partitions) { - if(subscribedTopics.size() > 0) - throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); - for(TopicPartition partition:partitions) - subscribedPartitions.add(partition); - } - - /** - * Unsubscribe from the specific topics. This will trigger a rebalance operation and messages for this topic will not be returned - * from the next {@link #poll(long) poll()} onwards - * @param topics Topics to unsubscribe from - */ - public void unsubscribe(String... topics) { - // throw an exception if the topic was never subscribed to - for(String topic:topics) { - if(!subscribedTopics.contains(topic)) - throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" + - " to unsubscribe(" + topic + ")"); - subscribedTopics.remove(topic); - } - // TODO trigger a rebalance operation - } - - /** - * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next - * {@link #poll(long) poll()} onwards - * @param partitions Partitions to unsubscribe from - */ - public void unsubscribe(TopicPartition... partitions) { - // throw an exception if the partition was never subscribed to - for(TopicPartition partition:partitions) { - if(!subscribedPartitions.contains(partition)) - throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" + - partition.topic() + "," + partition.partition() + ") should be called prior" + - " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")"); - subscribedPartitions.remove(partition); - } - // trigger a rebalance operation - } - - /** - * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have subscribed to - * any topics or partitions before polling for data. - *

- * The offset used for fetching the data is governed by whether or not {@link #seek(Map) seek(offsets)} - * is used. If {@link #seek(Map) seek(offsets)} is used, it will use the specified offsets on startup and - * on every rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed offset - * using {@link #commit(Map, boolean) commit(offsets, sync)} - * for the subscribed list of partitions. - * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. Must not be negative - * @return map of topic to records since the last fetch for the subscribed list of topics and partitions - */ - @Override - public Map poll(long timeout) { - // TODO Auto-generated method stub - return null; - } - - /** - * Commits the specified offsets for the specified list of topics and partitions to Kafka. - *

- * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance - * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. - * @param offsets The list of offsets per partition that should be committed to Kafka. - * @param sync If true, commit will block until the consumer receives an acknowledgment - * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null - * if the sync flag is set to false. - */ - @Override - public OffsetMetadata commit(Map offsets, boolean sync) { - throw new UnsupportedOperationException(); - } - - /** - * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and - * partitions. - *

- * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance - * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. - * @param sync If true, commit will block until the consumer receives an acknowledgment - * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null - * if the sync flag is set to false. - */ - @Override - public OffsetMetadata commit(boolean sync) { - throw new UnsupportedOperationException(); - } - - /** - * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API is invoked - * for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is - * arbitrarily used in the middle of consumption, to reset the fetch offsets - */ - @Override - public void seek(Map offsets) { - } - - /** - * Returns the fetch position of the next message for the specified topic partition to be used on the next {@link #poll(long) poll()} - * @param partitions Partitions for which the fetch position will be returned - * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long) poll()} - */ - public Map position(Collection partitions) { - return null; - } - - /** - * Fetches the last committed offsets of partitions that the consumer currently consumes. This API is only relevant if Kafka based offset - * storage is used. This API can be used in conjunction with {@link #seek(Map) seek(offsets)} to rewind consumption of data. - * @param partitions The list of partitions to return the last committed offset for - * @return The list of offsets committed on the last {@link #commit(boolean) commit(sync)} - */ - @Override - public Map committed(Collection partitions) { - // TODO Auto-generated method stub - throw new UnsupportedOperationException(); - } - - /** - * Fetches offsets before a certain timestamp. Note that the offsets returned are approximately computed and do not correspond to the exact - * message at the given timestamp. As such, if the consumer is rewound to offsets returned by this API, there may be duplicate messages - * returned by the consumer. - * @param partitions The list of partitions for which the offsets are returned - * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp. - * @return The offsets per partition before the specified timestamp. - */ - public Map offsetsBeforeTime(long timestamp, Collection partitions) { - return null; - } - - @Override - public Map metrics() { - return Collections.unmodifiableMap(this.metrics.metrics()); - } - - @Override - public void close() { - log.trace("Closing the Kafka consumer."); - subscribedTopics.clear(); - subscribedPartitions.clear(); - this.metrics.close(); - log.debug("The Kafka consumer has closed."); - } -} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java deleted file mode 100644 index c3aad3b..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ /dev/null @@ -1,192 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. -*/ -package org.apache.kafka.clients.consumer; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.TopicPartition; - -/** - * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. - * This class is not threadsafe - *

- * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it - * needs to communicate with. Failure to close the consumer after use will leak these resources. - */ -public class MockConsumer implements Consumer { - - private final Set subscribedPartitions; - private final Set subscribedTopics; - private final Map committedOffsets; - private final Map consumedOffsets; - - public MockConsumer() { - subscribedPartitions = new HashSet(); - subscribedTopics = new HashSet(); - committedOffsets = new HashMap(); - consumedOffsets = new HashMap(); - } - - @Override - public void subscribe(String... topics) { - if(subscribedPartitions.size() > 0) - throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); - for(String topic : topics) { - subscribedTopics.add(topic); - } - } - - @Override - public void subscribe(TopicPartition... partitions) { - if(subscribedTopics.size() > 0) - throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); - for(TopicPartition partition : partitions) { - subscribedPartitions.add(partition); - consumedOffsets.put(partition, 0L); - } - } - - public void unsubscribe(String... topics) { - // throw an exception if the topic was never subscribed to - for(String topic:topics) { - if(!subscribedTopics.contains(topic)) - throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" + - " to unsubscribe(" + topic + ")"); - subscribedTopics.remove(topic); - } - } - - public void unsubscribe(TopicPartition... partitions) { - // throw an exception if the partition was never subscribed to - for(TopicPartition partition:partitions) { - if(!subscribedPartitions.contains(partition)) - throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" + - partition.topic() + "," + partition.partition() + ") should be called prior" + - " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")"); - subscribedPartitions.remove(partition); - committedOffsets.remove(partition); - consumedOffsets.remove(partition); - } - } - - @Override - public Map poll(long timeout) { - // hand out one dummy record, 1 per topic - Map> records = new HashMap>(); - Map recordMetadata = new HashMap(); - for(TopicPartition partition : subscribedPartitions) { - // get the last consumed offset - long messageSequence = consumedOffsets.get(partition); - ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); - ObjectOutputStream outputStream; - try { - outputStream = new ObjectOutputStream(byteStream); - outputStream.writeLong(messageSequence++); - outputStream.close(); - } catch (IOException e) { - e.printStackTrace(); - } - List recordsForTopic = records.get(partition.topic()); - if(recordsForTopic == null) { - recordsForTopic = new ArrayList(); - records.put(partition.topic(), recordsForTopic); - } - recordsForTopic.add(new ConsumerRecord(partition.topic(), partition.partition(), null, byteStream.toByteArray(), messageSequence)); - consumedOffsets.put(partition, messageSequence); - } - for(Entry> recordsPerTopic : records.entrySet()) { - Map> recordsPerPartition = new HashMap>(); - for(ConsumerRecord record : recordsPerTopic.getValue()) { - List recordsForThisPartition = recordsPerPartition.get(record.partition()); - if(recordsForThisPartition == null) { - recordsForThisPartition = new ArrayList(); - recordsPerPartition.put(record.partition(), recordsForThisPartition); - } - recordsForThisPartition.add(record); - } - recordMetadata.put(recordsPerTopic.getKey(), new ConsumerRecords(recordsPerTopic.getKey(), recordsPerPartition)); - } - return recordMetadata; - } - - @Override - public OffsetMetadata commit(Map offsets, boolean sync) { - if(!sync) - return null; - for(Entry partitionOffset : offsets.entrySet()) { - committedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue()); - } - return new OffsetMetadata(committedOffsets, null); - } - - @Override - public OffsetMetadata commit(boolean sync) { - if(!sync) - return null; - return commit(consumedOffsets, sync); - } - - @Override - public void seek(Map offsets) { - // change the fetch offsets - for(Entry partitionOffset : offsets.entrySet()) { - consumedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue()); - } - } - - @Override - public Map committed(Collection partitions) { - Map offsets = new HashMap(); - for(TopicPartition partition : partitions) { - offsets.put(new TopicPartition(partition.topic(), partition.partition()), committedOffsets.get(partition)); - } - return offsets; - } - - @Override - public Map position(Collection partitions) { - Map positions = new HashMap(); - for(TopicPartition partition : partitions) { - positions.put(partition, consumedOffsets.get(partition)); - } - return positions; - } - - @Override - public Map offsetsBeforeTime(long timestamp, - Collection partitions) { - throw new UnsupportedOperationException(); - } - - @Override - public Map metrics() { - return null; - } - - @Override - public void close() { - // unsubscribe from all partitions - TopicPartition[] allPartitions = new TopicPartition[subscribedPartitions.size()]; - unsubscribe(subscribedPartitions.toArray(allPartitions)); - } -} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java deleted file mode 100644 index ea423ad..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer; - -import java.util.Map; - -import org.apache.kafka.common.TopicPartition; - -/** - * The metadata for an offset commit that has been acknowledged by the server - */ -public final class OffsetMetadata { - - private final Map offsets; - private final Map errors; - - public OffsetMetadata(Map offsets, Map errors) { - super(); - this.offsets = offsets; - this.errors = errors; - } - - public OffsetMetadata(Map offsets) { - this(offsets, null); - } - - /** - * The offset of the record in the topic/partition. - */ - public long offset(TopicPartition partition) { - if(this.errors != null) - throw errors.get(partition); - return offsets.get(partition); - } - - /** - * @return The exception corresponding to the error code returned by the server - */ - public Exception error(TopicPartition partition) { - if(errors != null) - return errors.get(partition); - else - return null; - } -} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f1def50..90cacbd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -13,6 +13,7 @@ package org.apache.kafka.clients.producer; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -43,7 +44,6 @@ import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; -import org.apache.kafka.common.utils.ClientUtils; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; @@ -118,7 +118,7 @@ public class KafkaProducer implements Producer { config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, time); - List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + List addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); this.sender = new Sender(new Selector(this.metrics, time), this.metadata, @@ -150,6 +150,28 @@ public class KafkaProducer implements Producer { } } + private static List parseAndValidateAddresses(List urls) { + List addresses = new ArrayList(); + for (String url : urls) { + if (url != null && url.length() > 0) { + String[] pieces = url.split(":"); + if (pieces.length != 2) + throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); + try { + InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1])); + if (address.isUnresolved()) + throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url); + addresses.add(address); + } catch (NumberFormatException e) { + throw new ConfigException("Invalid port in metadata.broker.list: " + url); + } + } + } + if (addresses.size() < 1) + throw new ConfigException("No bootstrap urls given in metadata.broker.list."); + return addresses; + } + /** * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)} */ diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java deleted file mode 100644 index cb33e34..0000000 --- a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.common.utils; - -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; - -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.config.ConfigException; - -public class ClientUtils { - public static List parseAndValidateAddresses(List urls) { - List addresses = new ArrayList(); - for (String url : urls) { - if (url != null && url.length() > 0) { - String[] pieces = url.split(":"); - if (pieces.length != 2) - throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); - try { - InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1])); - if (address.isUnresolved()) - throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url); - addresses.add(address); - } catch (NumberFormatException e) { - throw new ConfigException("Invalid port in metadata.broker.list: " + url); - } - } - } - if (addresses.size() < 1) - throw new ConfigException("No bootstrap urls given in metadata.broker.list."); - return addresses; - } -} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java deleted file mode 100644 index 0548fb4..0000000 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java +++ /dev/null @@ -1,298 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. -*/ -package org.apache.kafka.clients.consumer; - -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; - -import org.apache.kafka.common.TopicPartition; -import org.junit.Test; - -/** - * TODO: Clean this after the consumer implementation is complete. Until then, it is useful to write some sample test code using the new APIs - * - */ -public class ConsumerExampleTest { - /** - * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load - * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are automatically committed periodically, - * as controlled by the auto.commit.interval.ms config - */ -// @Test -// public void testConsumerGroupManagementWithAutoOffsetCommits() { -// Properties props = new Properties(); -// props.put("metadata.broker.list", "localhost:9092"); -// props.put("group.id", "test"); -// props.put("session.timeout.ms", "1000"); -// props.put("auto.commit.enable", "true"); -// props.put("auto.commit.interval.ms", "10000"); -// KafkaConsumer consumer = new KafkaConsumer(props); -// // subscribe to some topics -// consumer.subscribe("foo", "bar"); -// boolean isRunning = true; -// while(isRunning) { -// Map records = consumer.poll(100); -// process(records); -// } -// consumer.close(); -// } - - /** - * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load - * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are manually committed using - * either the commit() or commitAsync() APIs. This example also demonstrates rewinding the consumer's offsets if processing of consumed - * messages fails. - */ -// @Test -// public void testConsumerGroupManagementWithManualOffsetCommit() { -// Properties props = new Properties(); -// props.put("metadata.broker.list", "localhost:9092"); -// props.put("group.id", "test"); -// props.put("session.timeout.ms", "1000"); -// props.put("auto.commit.enable", "false"); -// KafkaConsumer consumer = new KafkaConsumer(props); -// // subscribe to some topics -// consumer.subscribe("foo", "bar"); -// int commitInterval = 100; -// int numRecords = 0; -// boolean isRunning = true; -// Map consumedOffsets = new HashMap(); -// while(isRunning) { -// Map records = consumer.poll(100); -// try { -// Map lastConsumedOffsets = process(records); -// consumedOffsets.putAll(lastConsumedOffsets); -// numRecords += records.size(); -// // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance -// if(numRecords % commitInterval == 0) -// consumer.commit(true); -// } catch(Exception e) { -// // rewind consumer's offsets for failed partitions -// List failedPartitions = getFailedPartitions(); -// Map offsetsToRewindTo = new HashMap(); -// for(TopicPartition failedPartition : failedPartitions) { -// // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset -// // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to. -// offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition)); -// } -// // seek to new offsets only for partitions that failed the last process() -// consumer.seek(offsetsToRewindTo); -// } -// } -// consumer.close(); -// } - - private List getFailedPartitions() { return null; } - - /** - * This example demonstrates the consumer can be used to leverage Kafka's group management functionality along with custom offset storage. - * In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to - * plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback - * callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance and - * before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer. - */ -// @Test -// public void testConsumerRebalanceWithCustomOffsetStore() { -// Properties props = new Properties(); -// props.put("metadata.broker.list", "localhost:9092"); -// props.put("group.id", "test"); -// props.put("session.timeout.ms", "1000"); -// props.put("auto.commit.enable", "true"); -// props.put("auto.commit.interval.ms", "10000"); -// KafkaConsumer consumer = new KafkaConsumer(props, -// new ConsumerRebalanceCallback() { -// public void onPartitionsAssigned(Consumer consumer, Collection partitions) { -// Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions); -// consumer.seek(lastCommittedOffsets); -// } -// public void onPartitionsRevoked(Consumer consumer, Collection partitions) { -// Map offsets = getLastConsumedOffsets(partitions); // implemented by the user -// commitOffsetsToCustomStore(offsets); // implemented by the user -// } -// private Map getLastCommittedOffsetsFromCustomStore(Collection partitions) { -// return null; -// } -// private Map getLastConsumedOffsets(Collection partitions) { return null; } -// private void commitOffsetsToCustomStore(Map offsets) {} -// }); -// // subscribe to topics -// consumer.subscribe("foo", "bar"); -// int commitInterval = 100; -// int numRecords = 0; -// boolean isRunning = true; -// while(isRunning) { -// Map records = consumer.poll(100); -// Map consumedOffsets = process(records); -// numRecords += records.size(); -// // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance -// if(numRecords % commitInterval == 0) -// commitOffsetsToCustomStore(consumedOffsets); -// } -// consumer.close(); -// } - - /** - * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with Kafka based offset storage. - * In this example, the assumption made is that the user chooses to use Kafka based offset management. - */ -// @Test -// public void testConsumerRewindWithGroupManagementAndKafkaOffsetStorage() { -// Properties props = new Properties(); -// props.put("metadata.broker.list", "localhost:9092"); -// props.put("group.id", "test"); -// props.put("session.timeout.ms", "1000"); -// props.put("auto.commit.enable", "false"); -// KafkaConsumer consumer = new KafkaConsumer(props, -// new ConsumerRebalanceCallback() { -// boolean rewindOffsets = true; -// public void onPartitionsAssigned(Consumer consumer, Collection partitions) { -// if(rewindOffsets) { -// Map latestCommittedOffsets = consumer.committed(null); -// Map newOffsets = rewindOffsets(latestCommittedOffsets, 100); -// consumer.seek(newOffsets); -// } -// } -// public void onPartitionsRevoked(Consumer consumer, Collection partitions) { -// consumer.commit(true); -// } -// // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages -// private Map rewindOffsets(Map currentOffsets, -// long numberOfMessagesToRewindBackTo) { -// Map newOffsets = new HashMap(); -// for(Map.Entry offset : currentOffsets.entrySet()) { -// newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo); -// } -// return newOffsets; -// } -// }); -// // subscribe to topics -// consumer.subscribe("foo", "bar"); -// int commitInterval = 100; -// int numRecords = 0; -// boolean isRunning = true; -// while(isRunning) { -// Map records = consumer.poll(100); -// Map consumedOffsets = process(records); -// numRecords += records.size(); -// // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance -// if(numRecords % commitInterval == 0) -// commitOffsetsToCustomStore(consumedOffsets); -// } -// consumer.close(); -// } - - /** - * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest - * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes - * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions. - * This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka - * based offset management. However, session.timeout.ms is not required since the Kafka consumer only does failure detection with group - * management. - */ -// @Test -// public void testConsumerWithKafkaBasedOffsetManagement() { -// Properties props = new Properties(); -// props.put("metadata.broker.list", "localhost:9092"); -// props.put("group.id", "test"); -// props.put("auto.commit.enable", "true"); -// props.put("auto.commit.interval.ms", "10000"); -// KafkaConsumer consumer = new KafkaConsumer(props); -// // subscribe to some partitions of topic foo -// TopicPartition partition0 = new TopicPartition("foo", 0); -// TopicPartition partition1 = new TopicPartition("foo", 1); -// TopicPartition[] partitions = new TopicPartition[2]; -// partitions[0] = partition0; -// partitions[1] = partition1; -// consumer.subscribe(partitions); -// // find the last committed offsets for partitions 0,1 of topic foo -// Map lastCommittedOffsets = consumer.committed(null); -// // seek to the last committed offsets to avoid duplicates -// consumer.seek(lastCommittedOffsets); -// // find the offsets of the latest available messages to know where to stop consumption -// Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, null); -// boolean isRunning = true; -// while(isRunning) { -// Map records = consumer.poll(100); -// Map consumedOffsets = process(records); -// for(TopicPartition partition : partitions) { -// if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition)) -// isRunning = false; -// else -// isRunning = true; -// } -// } -// consumer.close(); -// } - - /** - * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest - * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes - * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions. - * This example assumes that the user chooses to use custom offset storage. - */ - @Test - public void testConsumerWithCustomOffsetManagement() { -// Properties props = new Properties(); -// props.put("metadata.broker.list", "localhost:9092"); -// KafkaConsumer consumer = new KafkaConsumer(props); -// // subscribe to some partitions of topic foo -// TopicPartition partition0 = new TopicPartition("foo", 0); -// TopicPartition partition1 = new TopicPartition("foo", 1); -// TopicPartition[] partitions = new TopicPartition[2]; -// partitions[0] = partition0; -// partitions[1] = partition1; -// consumer.subscribe(partitions); -// Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(); -// // seek to the last committed offsets to avoid duplicates -// consumer.seek(lastCommittedOffsets); -// // find the offsets of the latest available messages to know where to stop consumption -// Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, null); -// boolean isRunning = true; -// while(isRunning) { -// Map records = consumer.poll(100); -// Map consumedOffsets = process(records); -// // commit offsets for partitions 0,1 for topic foo to custom store -// commitOffsetsToCustomStore(consumedOffsets); -// for(TopicPartition partition : partitions) { -// if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition)) -// isRunning = false; -// else -// isRunning = true; -// } -// } -// consumer.close(); - } - - private Map getLastCommittedOffsetsFromCustomStore() { return null; } - private void commitOffsetsToCustomStore(Map consumedOffsets) {} - private Map process(Map records) { - Map processedOffsets = new HashMap(); - for(Entry recordMetadata : records.entrySet()) { - List recordsPerTopic = recordMetadata.getValue().records(); - for(int i = 0;i < recordsPerTopic.size();i++) { - ConsumerRecord record = recordsPerTopic.get(i); - // process record - try { - processedOffsets.put(record.topicAndPartition(), record.offset()); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - return processedOffsets; - } -} diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index c032d26..b306a01 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -106,6 +106,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // useful for tracking migration of consumers to store offsets in kafka private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS) private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS) + private val rebalancesRate = newMeter(config.clientId + "-kafkaConsumerRebalancesPerMin", "rebalances", TimeUnit.MINUTES) + private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-kafkaConsumerRebalanceTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) val consumerIdString = { var consumerUuid : String = null @@ -575,35 +577,38 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def syncedRebalance() { rebalanceLock synchronized { - if(isShuttingDown.get()) { - return - } else { - for (i <- 0 until config.rebalanceMaxRetries) { - info("begin rebalancing consumer " + consumerIdString + " try #" + i) - var done = false - var cluster: Cluster = null - try { - cluster = getCluster(zkClient) - done = rebalance(cluster) - } catch { - case e: Throwable => - /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. - * For example, a ZK node can disappear between the time we get all children and the time we try to get - * the value of a child. Just let this go since another rebalance will be triggered. - **/ - info("exception during rebalance ", e) - } - info("end rebalancing consumer " + consumerIdString + " try #" + i) - if (done) { - return - } else { - /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should - * clear the cache */ - info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") + rebalancesRate.mark() + rebalanceTimer.time { + if(isShuttingDown.get()) { + return + } else { + for (i <- 0 until config.rebalanceMaxRetries) { + info("begin rebalancing consumer " + consumerIdString + " try #" + i) + var done = false + var cluster: Cluster = null + try { + cluster = getCluster(zkClient) + done = rebalance(cluster) + } catch { + case e: Throwable => + /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. + * For example, a ZK node can disappear between the time we get all children and the time we try to get + * the value of a child. Just let this go since another rebalance will be triggered. + **/ + info("exception during rebalance ", e) + } + info("end rebalancing consumer " + consumerIdString + " try #" + i) + if (done) { + return + } else { + /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should + * clear the cache */ + info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") + } + // stop all fetchers and clear all the queues to avoid data duplication + closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) + Thread.sleep(config.rebalanceBackoffMs) } - // stop all fetchers and clear all the queues to avoid data duplication - closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) - Thread.sleep(config.rebalanceBackoffMs) } } }