diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index fe93afa..18bcc90 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -57,11 +57,7 @@ import org.slf4j.LoggerFactory; * for(int i = 0;i < recordsPerTopic.size();i++) { * ConsumerRecord record = recordsPerTopic.get(i); * // process record - * try { - * processedOffsets.put(record.topicAndpartition(), record.offset()); - * } catch (Exception e) { - * e.printStackTrace(); - * } + * processedOffsets.put(record.partition(), record.offset()); * } * } * return processedOffsets; @@ -84,7 +80,7 @@ import org.slf4j.LoggerFactory; * consumer.subscribe("foo", "bar"); * boolean isRunning = true; * while(isRunning) { - * Map records = consumer.poll(100); + * Map records = consumer.poll(100, TimeUnit.MILLISECONDS); * process(records); * } * consumer.close(); @@ -92,7 +88,7 @@ import org.slf4j.LoggerFactory; * * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load * balancing and failover. This example assumes that the offsets are stored in Kafka and are manually committed using - * the commit(boolean) API. This example also demonstrates rewinding the consumer's offsets if processing of the consumed + * the commit() API. This example also demonstrates rewinding the consumer's offsets if processing of the consumed * messages fails. Note that this method of rewinding offsets using {@link #seek(Map) seek(offsets)} is only useful for rewinding the offsets * of the current consumer instance. As such, this will not trigger a rebalance or affect the fetch offsets for the other consumer instances. *
@@ -109,14 +105,14 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map consumedOffsets = new HashMap();
  * while(isRunning) {
- *     Map records = consumer.poll(100);
+ *     Map records = consumer.poll(100, TimeUnit.MILLISECONDS);
  *     try {
  *         Map lastConsumedOffsets = process(records);
  *         consumedOffsets.putAll(lastConsumedOffsets);
  *         numRecords += records.size();
  *         // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
  *         if(numRecords % commitInterval == 0) 
- *           consumer.commit(false);
+ *           consumer.commit();
  *     } catch(Exception e) {
  *         try {
  *             // rewind consumer's offsets for failed partitions
@@ -159,14 +155,14 @@ import org.slf4j.LoggerFactory;
  * KafkaConsumer consumer = new KafkaConsumer(props,
  *                                            new ConsumerRebalanceCallback() {
  *                                                boolean rewindOffsets = true;  // should be retrieved from external application config
- *                                                public void onPartitionsAssigned(Consumer consumer, Collection partitions) {
+ *                                                public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) {
  *                                                    Map latestCommittedOffsets = consumer.committed(partitions);
  *                                                    if(rewindOffsets)
  *                                                        Map newOffsets = rewindOffsets(latestCommittedOffsets, 100);
  *                                                    consumer.seek(newOffsets);
  *                                                }
- *                                                public void onPartitionsRevoked(Consumer consumer, Collection partitions) {
- *                                                    consumer.commit(true);
+ *                                                public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) {
+ *                                                    consumer.commit();
  *                                                }
  *                                                // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages 
  *                                                private Map rewindOffsets(Map currentOffsets,
@@ -183,15 +179,14 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map consumedOffsets = new HashMap();
  * while(isRunning) {
- *     Map records = consumer.poll(100);
+ *     Map records = consumer.poll(100, TimeUnit.MILLISECONDS);
  *     Map lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     numRecords += records.size();
  *     // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
  *     if(numRecords % commitInterval == 0) 
- *         consumer.commit(consumedOffsets, true);
+ *         consumer.commit(consumedOffsets);
  * }
- * consumer.commit(true);
  * consumer.close();
  * }
  * 
@@ -213,19 +208,19 @@ import org.slf4j.LoggerFactory; * props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage * KafkaConsumer consumer = new KafkaConsumer(props, * new ConsumerRebalanceCallback() { - * public void onPartitionsAssigned(Consumer consumer, Collection partitions) { + * public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) { * Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions); * consumer.seek(lastCommittedOffsets); * } - * public void onPartitionsRevoked(Consumer consumer, Collection partitions) { + * public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) { * Map offsets = getLastConsumedOffsets(partitions); * commitOffsetsToCustomStore(offsets); * } * // following APIs should be implemented by the user for custom offset management - * private Map getLastCommittedOffsetsFromCustomStore(Collection partitions) { + * private Map getLastCommittedOffsetsFromCustomStore(TopicPartition... partitions) { * return null; * } - * private Map getLastConsumedOffsets(Collection partitions) { return null; } + * private Map getLastConsumedOffsets(TopicPartition... partitions) { return null; } * private void commitOffsetsToCustomStore(Map offsets) {} * }); * consumer.subscribe("foo", "bar"); @@ -234,7 +229,7 @@ import org.slf4j.LoggerFactory; * boolean isRunning = true; * Map consumedOffsets = new HashMap(); * while(isRunning) { - * Map records = consumer.poll(100); + * Map records = consumer.poll(100, TimeUnit.MILLISECONDS); * Map lastConsumedOffsets = process(records); * consumedOffsets.putAll(lastConsumedOffsets); * numRecords += records.size(); @@ -242,7 +237,6 @@ import org.slf4j.LoggerFactory; * if(numRecords % commitInterval == 0) * commitOffsetsToCustomStore(consumedOffsets); * } - * consumer.commit(true); * consumer.close(); * } * @@ -268,15 +262,15 @@ import org.slf4j.LoggerFactory; * partitions[1] = partition1; * consumer.subscribe(partitions); * // find the last committed offsets for partitions 0,1 of topic foo - * Map lastCommittedOffsets = consumer.committed(Arrays.asList(partitions)); + * Map lastCommittedOffsets = consumer.committed(partition0, partition1); * // seek to the last committed offsets to avoid duplicates * consumer.seek(lastCommittedOffsets); * // find the offsets of the latest available messages to know where to stop consumption - * Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, Arrays.asList(partitions)); + * Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1); * boolean isRunning = true; * Map consumedOffsets = new HashMap(); * while(isRunning) { - * Map records = consumer.poll(100); + * Map records = consumer.poll(100, TimeUnit.MILLISECONDS); * Map lastConsumedOffsets = process(records); * consumedOffsets.putAll(lastConsumedOffsets); * for(TopicPartition partition : partitions) { @@ -286,7 +280,7 @@ import org.slf4j.LoggerFactory; * isRunning = true; * } * } - * consumer.commit(true); + * consumer.commit(); * consumer.close(); * } * @@ -310,11 +304,11 @@ import org.slf4j.LoggerFactory; * // seek to the last committed offsets to avoid duplicates * consumer.seek(lastCommittedOffsets); * // find the offsets of the latest available messages to know where to stop consumption - * Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, Arrays.asList(partitions)); + * Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1); * boolean isRunning = true; * Map consumedOffsets = new HashMap(); * while(isRunning) { - * Map records = consumer.poll(100); + * Map records = consumer.poll(100, TimeUnit.MILLISECONDS); * Map lastConsumedOffsets = process(records); * consumedOffsets.putAll(lastConsumedOffsets); * // commit offsets for partitions 0,1 for topic foo to custom store diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java index 29ad25e..0548fb4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java @@ -53,8 +53,9 @@ public class ConsumerExampleTest { /** * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load - * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are manually committed using the - * commit() API. This example also demonstrates rewinding the consumer's offsets if processing of consumed messages fails. + * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are manually committed using + * either the commit() or commitAsync() APIs. This example also demonstrates rewinding the consumer's offsets if processing of consumed + * messages fails. */ // @Test // public void testConsumerGroupManagementWithManualOffsetCommit() { diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index c032d26..284ee7b 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -106,6 +106,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // useful for tracking migration of consumers to store offsets in kafka private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS) private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS) + private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-kafkaConsumerRebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) val consumerIdString = { var consumerUuid : String = null @@ -575,35 +576,37 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def syncedRebalance() { rebalanceLock synchronized { - if(isShuttingDown.get()) { - return - } else { - for (i <- 0 until config.rebalanceMaxRetries) { - info("begin rebalancing consumer " + consumerIdString + " try #" + i) - var done = false - var cluster: Cluster = null - try { - cluster = getCluster(zkClient) - done = rebalance(cluster) - } catch { - case e: Throwable => - /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. - * For example, a ZK node can disappear between the time we get all children and the time we try to get - * the value of a child. Just let this go since another rebalance will be triggered. - **/ - info("exception during rebalance ", e) - } - info("end rebalancing consumer " + consumerIdString + " try #" + i) - if (done) { - return - } else { - /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should - * clear the cache */ - info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") + rebalanceTimer.time { + if(isShuttingDown.get()) { + return + } else { + for (i <- 0 until config.rebalanceMaxRetries) { + info("begin rebalancing consumer " + consumerIdString + " try #" + i) + var done = false + var cluster: Cluster = null + try { + cluster = getCluster(zkClient) + done = rebalance(cluster) + } catch { + case e: Throwable => + /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. + * For example, a ZK node can disappear between the time we get all children and the time we try to get + * the value of a child. Just let this go since another rebalance will be triggered. + **/ + info("exception during rebalance ", e) + } + info("end rebalancing consumer " + consumerIdString + " try #" + i) + if (done) { + return + } else { + /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should + * clear the cache */ + info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") + } + // stop all fetchers and clear all the queues to avoid data duplication + closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) + Thread.sleep(config.rebalanceBackoffMs) } - // stop all fetchers and clear all the queues to avoid data duplication - closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) - Thread.sleep(config.rebalanceBackoffMs) } } }