From a76cc398a6643466c148dab92fb2b7f85d3a55b8 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Wed, 29 Apr 2015 16:22:42 -0700 Subject: [PATCH 1/6] KAFKA-2123: Add callbacks and retries for offset commits in new consumer. --- .../apache/kafka/clients/consumer/Consumer.java | 10 ++ .../clients/consumer/ConsumerCommitCallback.java | 29 ++++ .../kafka/clients/consumer/ConsumerConfig.java | 11 +- .../kafka/clients/consumer/KafkaConsumer.java | 54 +++++- .../kafka/clients/consumer/MockConsumer.java | 19 ++- .../clients/consumer/internals/Coordinator.java | 113 ++++++++----- .../ConsumerCoordinatorNotAvailableException.java | 40 +++++ .../errors/NotCoordinatorForConsumerException.java | 40 +++++ .../errors/OffsetLoadInProgressException.java | 40 +++++ .../org/apache/kafka/common/protocol/Errors.java | 6 +- .../consumer/internals/CoordinatorTest.java | 184 ++++++++++++++++++--- .../scala/integration/kafka/api/ConsumerTest.scala | 73 ++++++-- 12 files changed, 530 insertions(+), 89 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 8f587bc..bcd886f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -64,11 +64,21 @@ public interface Consumer extends Closeable { public void commit(CommitType commitType); /** + * @see KafkaConsumer#commit(CommitType, ConsumerCommitCallback) + */ + public void commit(CommitType commitType, ConsumerCommitCallback callback); + + /** * @see KafkaConsumer#commit(Map, CommitType) */ public void commit(Map offsets, CommitType commitType); /** + * @see KafkaConsumer#commit(Map, CommitType, ConsumerCommitCallback) + */ + public void commit(Map offsets, CommitType commitType, ConsumerCommitCallback callback); + + /** * @see KafkaConsumer#seek(TopicPartition, long) */ public void seek(TopicPartition partition, long offset); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java new file mode 100644 index 0000000..b99bef1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +/** + * A callback interface that the user can implement to trigger custom actions when a commit request completes. The callback + * may be executed in any thread calling {@link Consumer#poll(long) poll()}. + */ +public interface ConsumerCommitCallback { + + /** + * A callback method the user can implement to provide asynchronous handling of commit request completion. + * This method will be called when the commit request sent to the server has been acknowledged. + * + * @param exception The exception thrown during processing of the request, or null if the commit completed successfully + */ + void onCommitCompleted(Exception exception); +} + diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index bdff518..dd78721 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -156,6 +156,10 @@ public class ConsumerConfig extends AbstractConfig { public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"; private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the Deserializer interface."; + /** commit.retries */ + public static final String COMMIT_RETRIES_CONFIG = "commit.retries"; + private static final String COMMIT_RETRIES_DOC = "Number of times to retry an offset commit request if it fails with potentially transient errors. This is no different than if the client resent the request upon receiving an error. A negative value indicates the consumer should retry indefinitely."; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, @@ -277,7 +281,12 @@ public class ConsumerConfig extends AbstractConfig { .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, - VALUE_DESERIALIZER_CLASS_DOC); + VALUE_DESERIALIZER_CLASS_DOC) + .define(COMMIT_RETRIES_CONFIG, + Type.INT, + -1, + Importance.MEDIUM, + COMMIT_RETRIES_DOC); } public static Map addDeserializerToConfig(Map configs, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index d301be4..02327b8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -484,6 +484,7 @@ public class KafkaConsumer implements Consumer { config.getString(ConsumerConfig.GROUP_ID_CONFIG), this.retryBackoffMs, config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), + config.getInt(ConsumerConfig.COMMIT_RETRIES_CONFIG), config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), this.metadata, this.subscriptions, @@ -677,15 +678,17 @@ public class KafkaConsumer implements Consumer { * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API * should not be used. *

- * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails. - * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until - * the commit succeeds. - * + * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails, + * but the callback will be invoked with the exception. A blocking commit will wait for a response acknowledging the + * commit. In the event of an error, both types of of commit will retry up to commit.retries times + * before invoking the callback with the error and, for blocking calls, throwing an exception. + * * @param offsets The list of offsets per partition that should be committed to Kafka. * @param commitType Control whether the commit is blocking + * @param callback Callback to invoke when the commit completes */ @Override - public synchronized void commit(final Map offsets, CommitType commitType) { + public synchronized void commit(final Map offsets, CommitType commitType, ConsumerCommitCallback callback) { ensureNotClosed(); log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets); @@ -696,7 +699,26 @@ public class KafkaConsumer implements Consumer { boolean syncCommit = commitType.equals(CommitType.SYNC); if (!syncCommit) this.subscriptions.needRefreshCommits(); - coordinator.commitOffsets(offsets, syncCommit, now); + coordinator.commitOffsets(offsets, syncCommit, callback, now); + } + + /** + * Commits the specified offsets for the specified list of topics and partitions to Kafka. + *

+ * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every + * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API + * should not be used. + *

+ * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails. + * A blocking commit will wait for a response acknowledging the + * commit. In the event of an error, both types of of commit will retry up to commit.retries times. + * + * @param offsets The list of offsets per partition that should be committed to Kafka. + * @param commitType Control whether the commit is blocking + */ + @Override + public synchronized void commit(final Map offsets, CommitType commitType) { + commit(offsets, commitType, null); } /** @@ -705,13 +727,29 @@ public class KafkaConsumer implements Consumer { * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API * should not be used. - * + * + * @param commitType Whether or not the commit should block until it is acknowledged. + * @param callback Callback to invoke when the commit completes + */ + @Override + public synchronized void commit(CommitType commitType, ConsumerCommitCallback callback) { + ensureNotClosed(); + commit(this.subscriptions.allConsumed(), commitType, callback); + } + + /** + * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions. + *

+ * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after + * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API + * should not be used. + * * @param commitType Whether or not the commit should block until it is acknowledged. */ @Override public synchronized void commit(CommitType commitType) { ensureNotClosed(); - commit(this.subscriptions.allConsumed(), commitType); + commit(this.subscriptions.allConsumed(), commitType, null); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index f50da82..97a55f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -106,16 +106,29 @@ public class MockConsumer implements Consumer { } @Override - public synchronized void commit(Map offsets, CommitType commitType) { + public synchronized void commit(Map offsets, CommitType commitType, ConsumerCommitCallback callback) { ensureNotClosed(); for (Entry entry : offsets.entrySet()) subscriptions.committed(entry.getKey(), entry.getValue()); + if (callback != null) { + callback.onCommitCompleted(null); + } } @Override - public synchronized void commit(CommitType commitType) { + public synchronized void commit(Map offsets, CommitType commitType) { + commit(offsets, commitType, null); + } + + @Override + public synchronized void commit(CommitType commitType, ConsumerCommitCallback callback) { ensureNotClosed(); - commit(this.subscriptions.allConsumed(), commitType); + commit(this.subscriptions.allConsumed(), commitType, callback); + } + + @Override + public synchronized void commit(CommitType commitType) { + commit(commitType, null); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index e55ab11..b7c8101 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -17,10 +17,12 @@ import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.clients.consumer.ConsumerCommitCallback; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -74,6 +76,7 @@ public final class Coordinator { private final SubscriptionState subscriptions; private final CoordinatorMetrics sensors; private final long retryBackoffMs; + private final int commitRetries; private Node consumerCoordinator; private String consumerId; private int generation; @@ -85,6 +88,7 @@ public final class Coordinator { String groupId, long retryBackoffMs, long sessionTimeoutMs, + int commitRetries, String assignmentStrategy, Metadata metadata, SubscriptionState subscriptions, @@ -103,6 +107,7 @@ public final class Coordinator { this.subscriptions = subscriptions; this.retryBackoffMs = retryBackoffMs; this.sessionTimeoutMs = sessionTimeoutMs; + this.commitRetries = commitRetries; this.assignmentStrategy = assignmentStrategy; this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds()); this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags); @@ -148,15 +153,16 @@ public final class Coordinator { /** * Commit offsets for the specified list of topics and partitions. * - * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails. + * A non-blocking commit will attempt to commit offsets asynchronously. No error will be thrown if the commit fails. * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until * the commit succeeds. * * @param offsets The list of offsets per partition that should be committed. * @param blocking Control whether the commit is blocking + * @param callback Callback to invoke when the request completes * @param now The current time */ - public void commitOffsets(final Map offsets, boolean blocking, long now) { + public void commitOffsets(final Map offsets, boolean blocking, ConsumerCommitCallback callback, long now) { if (!offsets.isEmpty()) { // create the offset commit request Map offsetData; @@ -169,29 +175,13 @@ public final class Coordinator { OffsetCommitRequest.DEFAULT_RETENTION_TIME, offsetData); - // send request and possibly wait for response if it is blocking - RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets); - + // Send request and possibly wait for response if it is blocking. The CommitOffsetHandler handles + // send + retries + CommitOffsetHandler handler = new CommitOffsetHandler(req, callback, now); if (blocking) { - boolean done; - do { - ClientResponse response = blockingCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now); - - // check for errors - done = true; - OffsetCommitResponse commitResponse = new OffsetCommitResponse(response.responseBody()); - for (short errorCode : commitResponse.responseData().values()) { - if (errorCode != Errors.NONE.code()) - done = false; - } - if (!done) { - log.debug("Error in offset commit, backing off for {} ms before retrying again.", - this.retryBackoffMs); - Utils.sleep(this.retryBackoffMs); - } - } while (!done); - } else { - this.client.send(initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now)); + while (!handler.isComplete()) { + handler.poll(); + } } } } @@ -445,7 +435,6 @@ public final class Coordinator { * @return The response */ private ClientResponse sendAndReceive(ClientRequest clientRequest, long now) { - // send the request this.client.send(clientRequest); @@ -483,37 +472,87 @@ public final class Coordinator { } } - private class CommitOffsetCompletionHandler implements RequestCompletionHandler { + private class CommitOffsetHandler implements RequestCompletionHandler { + + private final OffsetCommitRequest request; + private final ConsumerCommitCallback callback; + private int attempts; + private boolean completed; + private ClientRequest clientRequest; + private Errors error = null; + + public CommitOffsetHandler(OffsetCommitRequest request, ConsumerCommitCallback callback, long now) { + this.request = request; + this.callback = callback; + this.attempts = 0; + this.completed = false; + + // Send the request + clientRequest = initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, request.toStruct(), this, now); + client.send(clientRequest); + } - private final Map offsets; + public boolean isComplete() { + return completed; + } - public CommitOffsetCompletionHandler(Map offsets) { - this.offsets = offsets; + public void poll() { + client.completeAll(clientRequest.request().destination()); + if (completed && error != null) { + throw error.exception(); + } } @Override public void onComplete(ClientResponse resp) { if (resp.wasDisconnected()) { handleCoordinatorDisconnect(resp); + error = Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE; } else { OffsetCommitResponse response = new OffsetCommitResponse(resp.responseBody()); for (Map.Entry entry : response.responseData().entrySet()) { TopicPartition tp = entry.getKey(); short errorCode = entry.getValue(); - long offset = this.offsets.get(tp); + long offset = request.offsetData().get(tp).offset; if (errorCode == Errors.NONE.code()) { log.debug("Committed offset {} for partition {}", offset, tp); subscriptions.committed(tp, offset); - } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() - || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { - coordinatorDead(); } else { - log.error("Error committing partition {} at offset {}: {}", - tp, - offset, - Errors.forCode(errorCode).exception().getMessage()); + Errors newError = Errors.forCode(errorCode); + // Make sure that if any errors are not retriable, the error passed to the callback and thrown + // for sync commits is also not retriable + if (error == null || (error.exception() instanceof RetriableException && !(newError.exception() instanceof RetriableException))) { + error = newError; + } + if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() + || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { + coordinatorDead(); + } else { + log.error("Error committing partition {} at offset {}: {}", + tp, + offset, + error.exception().getMessage()); + } + } + } + } + if (error != null) { + if ((commitRetries < 0 || attempts < commitRetries) && error.exception() instanceof RetriableException) { + attempts++; + error = null; + clientRequest = initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, request.toStruct(), this, time.milliseconds()); + client.send(clientRequest); + } else { + if (callback != null) { + callback.onCommitCompleted(error.exception()); } + completed = true; + } + } else { + if (callback != null) { + callback.onCommitCompleted(null); } + completed = true; } sensors.commitLatency.record(resp.requestLatencyMs()); } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java new file mode 100644 index 0000000..5d0c55a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.errors; + +/** + * The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has + * not yet been created. + */ +public class ConsumerCoordinatorNotAvailableException extends RetriableException { + + private static final long serialVersionUID = 1L; + + public ConsumerCoordinatorNotAvailableException() { + super(); + } + + public ConsumerCoordinatorNotAvailableException(String message) { + super(message); + } + + public ConsumerCoordinatorNotAvailableException(String message, Throwable cause) { + super(message, cause); + } + + public ConsumerCoordinatorNotAvailableException(Throwable cause) { + super(cause); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java new file mode 100644 index 0000000..94d9934 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.errors; + +/** + * The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is + * not a coordinator for. + */ +public class NotCoordinatorForConsumerException extends RetriableException { + + private static final long serialVersionUID = 1L; + + public NotCoordinatorForConsumerException() { + super(); + } + + public NotCoordinatorForConsumerException(String message) { + super(message); + } + + public NotCoordinatorForConsumerException(String message, Throwable cause) { + super(message, cause); + } + + public NotCoordinatorForConsumerException(Throwable cause) { + super(cause); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java new file mode 100644 index 0000000..f442573 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.errors; + +/** + * The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change + * for that offsets topic partition). + */ +public class OffsetLoadInProgressException extends RetriableException { + + private static final long serialVersionUID = 1L; + + public OffsetLoadInProgressException() { + super(); + } + + public OffsetLoadInProgressException(String message) { + super(message); + } + + public OffsetLoadInProgressException(String message, Throwable cause) { + super(message, cause); + } + + public OffsetLoadInProgressException(Throwable cause) { + super(cause); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 36aa412..6ca3843 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -53,11 +53,11 @@ public enum Errors { NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")), OFFSET_LOAD_IN_PROGRESS(14, - new ApiException("The coordinator is loading offsets and can't process requests.")), + new OffsetLoadInProgressException("The coordinator is loading offsets and can't process requests.")), CONSUMER_COORDINATOR_NOT_AVAILABLE(15, - new ApiException("The coordinator is not available.")), + new ConsumerCoordinatorNotAvailableException("The coordinator is not available.")), NOT_COORDINATOR_FOR_CONSUMER(16, - new ApiException("This is not the correct co-ordinator for this consumer.")), + new NotCoordinatorForConsumerException("This is not the correct co-ordinator for this consumer.")), INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")), RECORD_LIST_TOO_LARGE(18, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java index b06c4a7..0ef35e3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java @@ -18,12 +18,17 @@ package org.apache.kafka.clients.consumer.internals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.consumer.ConsumerCommitCallback; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.NotCoordinatorForConsumerException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; @@ -51,6 +56,7 @@ public class CoordinatorTest { private TopicPartition tp = new TopicPartition(topicName, 0); private long retryBackoffMs = 0L; private long sessionTimeoutMs = 10L; + private int defaultCommitRetries = 0; private String rebalanceStrategy = "not-matter"; private MockTime time = new MockTime(); private MockClient client = new MockClient(time); @@ -61,17 +67,7 @@ public class CoordinatorTest { private Metrics metrics = new Metrics(time); private Map metricTags = new LinkedHashMap(); - private Coordinator coordinator = new Coordinator(client, - groupId, - retryBackoffMs, - sessionTimeoutMs, - rebalanceStrategy, - metadata, - subscriptions, - metrics, - "consumer" + groupId, - metricTags, - time); + private Coordinator coordinator; @Before public void setup() { @@ -81,6 +77,7 @@ public class CoordinatorTest { @Test public void testNormalHeartbeat() { + createCoordinator(); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); // normal heartbeat @@ -93,6 +90,7 @@ public class CoordinatorTest { @Test public void testCoordinatorNotAvailable() { + createCoordinator(); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); // consumer_coordinator_not_available will mark coordinator as unknown @@ -107,6 +105,7 @@ public class CoordinatorTest { @Test public void testNotCoordinator() { + createCoordinator(); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); // not_coordinator will mark coordinator as unknown @@ -121,6 +120,7 @@ public class CoordinatorTest { @Test public void testIllegalGeneration() { + createCoordinator(); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); // illegal_generation will cause re-partition @@ -138,6 +138,7 @@ public class CoordinatorTest { @Test public void testCoordinatorDisconnect() { + createCoordinator(); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); // coordinator disconnect will mark coordinator as unknown @@ -152,17 +153,19 @@ public class CoordinatorTest { @Test public void testNormalJoinGroup() { + createCoordinator(); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); // normal join group client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code())); assertEquals(Collections.singletonList(tp), - coordinator.assignPartitions(Collections.singletonList(topicName), time.milliseconds())); + coordinator.assignPartitions(Collections.singletonList(topicName), time.milliseconds())); assertEquals(0, client.inFlightRequestCount()); } @Test public void testReJoinGroup() { + createCoordinator(); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); // diconnected from original coordinator will cause re-discover and join again @@ -177,55 +180,151 @@ public class CoordinatorTest { @Test public void testCommitOffsetNormal() { + createCoordinator(); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); // sync commit + MockCommitCallback cb = new MockCommitCallback(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds()); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, cb, time.milliseconds()); + assertEquals(1, cb.invoked); + assertNull(cb.exception); // async commit - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds()); + cb = new MockCommitCallback(); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, cb, time.milliseconds()); client.respond(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); assertEquals(1, client.poll(0, time.milliseconds()).size()); + assertEquals(1, cb.invoked); + assertNull(cb.exception); + + // null callback + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, null, time.milliseconds()); } @Test - public void testCommitOffsetError() { + public void testCommitOffsetAsyncCoordinatorNotAvailable() { + createCoordinator(); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); // async commit with coordinator not available + MockCommitCallback cb = new MockCommitCallback(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()))); - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds()); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, cb, time.milliseconds()); assertEquals(1, client.poll(0, time.milliseconds()).size()); assertTrue(coordinator.coordinatorUnknown()); - // resume + assertEquals(1, cb.invoked); + assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), cb.exception); + } + + @Test + public void testCommitOffsetAsyncNotCoordinator() { + createCoordinator(); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); // async commit with not coordinator + MockCommitCallback cb = new MockCommitCallback(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code()))); - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds()); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, cb, time.milliseconds()); assertEquals(1, client.poll(0, time.milliseconds()).size()); assertTrue(coordinator.coordinatorUnknown()); - // resume + assertEquals(1, cb.invoked); + assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.exception(), cb.exception); + } + + @Test + public void testCommitOffsetSyncCoordinatorDisconnected() { + createCoordinator(); + + // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request) + MockCommitCallback cb = new MockCommitCallback(); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true); + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, cb, time.milliseconds()); + assertEquals(1, cb.invoked); + assertNull(cb.exception); + } + + @Test + public void testCommitOffsetWithRetries() { + createCoordinatorWithRetries(1); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - // sync commit with not_coordinator + // sync commit with error, then success client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code()))); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds()); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, null, time.milliseconds()); + } - // sync commit with coordinator disconnected - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true); + @Test + public void testCommitOffsetWithRetriesFails() { + createCoordinatorWithRetries(3); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds()); + + // sync commit with retriable errors should throw an exception after the expected number of retries; evaluates + // all expected retriable errors, then repeats enough to trigger an exception + try { + // These trigger metadata refresh and need followup consumer metadata response + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code()))); + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()))); + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + // These just indicate a you need to wait + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code()))); + // And the final one triggers the failure + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code()))); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, null, time.milliseconds()); + fail("Coordinator#commitOffsets should have thrown an exception"); + } catch (NotCoordinatorForConsumerException e) { + // Should be no responses left + assertEquals(0, client.poll(0, time.milliseconds()).size()); + } } + @Test + public void testCommitOffsetWithRetriesFailsEarlyWithNonRetriableException() { + createCoordinatorWithRetries(2); + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + + // sync commit with retries only attempts once when error is non-retriable; run multiple times to check + // each possible unretriable exception + try { + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.OFFSET_METADATA_TOO_LARGE.code()))); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, null, time.milliseconds()); + fail("Coordinator#commitOffsets should have thrown an exception"); + } catch (ApiException e) { + assertEquals(0, client.poll(0, time.milliseconds()).size()); + } + + try { + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN.code()))); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, null, time.milliseconds()); + fail("Coordinator#commitOffsets should have thrown an exception"); + } catch (ApiException e) { + assertEquals(0, client.poll(0, time.milliseconds()).size()); + } + } @Test - public void testFetchOffset() { + public void testCommitOffsetWithInfiniteRetries() { + createCoordinatorWithRetries(-1); + + // No way to test infinite, but a very large # is good enough + for (int i = 0; i < 10000; i++) { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code()))); + } + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, null, time.milliseconds()); + } + @Test + public void testFetchOffset() { + createCoordinator(); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); // normal fetch @@ -281,4 +380,37 @@ public class CoordinatorTest { OffsetFetchResponse response = new OffsetFetchResponse(Collections.singletonMap(tp, data)); return response.toStruct(); } + + + + private void createCoordinatorWithRetries(int retries) { + coordinator = new Coordinator(client, + groupId, + retryBackoffMs, + sessionTimeoutMs, + retries, + rebalanceStrategy, + metadata, + subscriptions, + metrics, + "consumer" + groupId, + metricTags, + time); + } + + private void createCoordinator() { + createCoordinatorWithRetries(0); + } + + + private static class MockCommitCallback implements ConsumerCommitCallback { + public int invoked = 0; + public Exception exception = null; + + @Override + public void onCommitCompleted(Exception exception) { + invoked++; + this.exception = exception; + } + } } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index ffbdf5d..3b1756f 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -14,17 +14,11 @@ package kafka.api import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.consumer.CommitType +import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.TopicPartition -import org.apache.kafka.clients.consumer.NoOffsetForPartitionException -import kafka.utils.{ShutdownableThread, TestUtils, Logging} +import kafka.utils.{TestUtils, Logging} import kafka.server.OffsetManager import java.util.ArrayList @@ -45,6 +39,8 @@ class ConsumerTest extends IntegrationTestHarness with Logging { val topic = "topic" val part = 0 val tp = new TopicPartition(topic, part) + val part2 = 1 + val tp2 = new TopicPartition(topic, part2) // configure the servers and clients this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown @@ -59,7 +55,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { super.setUp() // create the test topic with all the brokers as replicas - TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers) + TestUtils.createTopic(this.zkClient, topic, 2, serverCount, this.servers) } def testSimpleConsumption() { @@ -135,6 +131,44 @@ class ConsumerTest extends IntegrationTestHarness with Logging { // another consumer in the same group should get the same position this.consumers(1).subscribe(tp) consumeRecords(this.consumers(1), 1, 5) + + // check async commit callbacks + val commitCallback = new CountConsumerCommitCallback() + this.consumers(0).commit(CommitType.ASYNC, commitCallback) + // shouldn't make progress until poll is invoked + Thread.sleep(10) + assertEquals(0, commitCallback.count) + awaitCommitCallback(this.consumers(0), commitCallback) + } + + def testCommitSpecifiedOffsets() { + sendRecords(5, tp) + sendRecords(7, tp2) + + this.consumers(0).subscribe(tp) + this.consumers(0).subscribe(tp2) + + // Need to poll to join the group + this.consumers(0).poll(50) + val pos1 = this.consumers(0).position(tp) + val pos2 = this.consumers(0).position(tp2) + this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp, 3L)), CommitType.SYNC) + assertEquals(3, this.consumers(0).committed(tp)) + intercept[NoOffsetForPartitionException] { + this.consumers(0).committed(tp2) + } + // positions should not change + assertEquals(pos1, this.consumers(0).position(tp)) + assertEquals(pos2, this.consumers(0).position(tp2)) + this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 5L)), CommitType.SYNC) + assertEquals(3, this.consumers(0).committed(tp)) + assertEquals(5, this.consumers(0).committed(tp2)) + + // Using async should pick up the committed changes after commit completes + val commitCallback = new CountConsumerCommitCallback() + this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 7L)), CommitType.ASYNC, commitCallback) + awaitCommitCallback(this.consumers(0), commitCallback) + assertEquals(7, this.consumers(0).committed(tp2)) } def testPartitionsFor() { @@ -189,9 +223,13 @@ class ConsumerTest extends IntegrationTestHarness with Logging { } } - private def sendRecords(numRecords: Int) { + private def sendRecords(numRecords: Int): Unit = { + sendRecords(numRecords, tp) + } + + private def sendRecords(numRecords: Int, tp: TopicPartition) { val futures = (0 until numRecords).map { i => - this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes)) + this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), i.toString.getBytes, i.toString.getBytes)) } futures.map(_.get) } @@ -216,4 +254,17 @@ class ConsumerTest extends IntegrationTestHarness with Logging { } } + private def awaitCommitCallback(consumer: Consumer[Array[Byte], Array[Byte]], commitCallback: CountConsumerCommitCallback): Unit = { + val startCount = commitCallback.count + val started = System.currentTimeMillis() + while (commitCallback.count == startCount && System.currentTimeMillis() - started < 10000) + this.consumers(0).poll(10000) + assertEquals(startCount + 1, commitCallback.count) + } + + private class CountConsumerCommitCallback extends ConsumerCommitCallback { + var count = 0 + + override def onCommitCompleted(exception: Exception): Unit = count += 1 + } } \ No newline at end of file -- 2.3.5 From 457d265e31fe1967fc6c9b6e0d029a0ed537e07c Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Thu, 30 Apr 2015 11:20:02 -0700 Subject: [PATCH 2/6] KAFKA-2123: Add queuing of offset commit requests. --- .../clients/consumer/internals/Coordinator.java | 28 ++++++++++------ .../consumer/internals/CoordinatorTest.java | 37 +++++++++++++++++++++- 2 files changed, 55 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index b7c8101..db31e1b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -51,11 +51,7 @@ import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.TimeUnit; /** @@ -80,6 +76,7 @@ public final class Coordinator { private Node consumerCoordinator; private String consumerId; private int generation; + private Queue commitOffsetRequests; /** * Initialize the coordination manager. @@ -111,6 +108,7 @@ public final class Coordinator { this.assignmentStrategy = assignmentStrategy; this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds()); this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags); + this.commitOffsetRequests = new LinkedList(); } /** @@ -176,8 +174,11 @@ public final class Coordinator { offsetData); // Send request and possibly wait for response if it is blocking. The CommitOffsetHandler handles - // send + retries - CommitOffsetHandler handler = new CommitOffsetHandler(req, callback, now); + // send + retries. + CommitOffsetHandler handler = new CommitOffsetHandler(req, callback); + commitOffsetRequests.add(handler); + if (commitOffsetRequests.size() == 1) + handler.send(now); if (blocking) { while (!handler.isComplete()) { handler.poll(); @@ -481,13 +482,14 @@ public final class Coordinator { private ClientRequest clientRequest; private Errors error = null; - public CommitOffsetHandler(OffsetCommitRequest request, ConsumerCommitCallback callback, long now) { + public CommitOffsetHandler(OffsetCommitRequest request, ConsumerCommitCallback callback) { this.request = request; this.callback = callback; this.attempts = 0; this.completed = false; + } - // Send the request + public void send(long now) { clientRequest = initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, request.toStruct(), this, now); client.send(clientRequest); } @@ -554,6 +556,14 @@ public final class Coordinator { } completed = true; } + + if (completed) { + CommitOffsetHandler removed = commitOffsetRequests.remove(); + assert removed == this; + if (!commitOffsetRequests.isEmpty()) { + commitOffsetRequests.element().send(time.milliseconds()); + } + } sensors.commitLatency.record(resp.requestLatencyMs()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java index 0ef35e3..b56b758 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java @@ -54,9 +54,9 @@ public class CoordinatorTest { private String topicName = "test"; private String groupId = "test-group"; private TopicPartition tp = new TopicPartition(topicName, 0); + private TopicPartition tp2 = new TopicPartition(topicName, 1); private long retryBackoffMs = 0L; private long sessionTimeoutMs = 10L; - private int defaultCommitRetries = 0; private String rebalanceStrategy = "not-matter"; private MockTime time = new MockTime(); private MockClient client = new MockClient(time); @@ -323,6 +323,41 @@ public class CoordinatorTest { } @Test + public void testMultipleCommits() { + createCoordinatorWithRetries(10); + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + + // stacking multiple offset commits should, in the face of errors: + // a) process each in order such that we can see intermediate committed offsets & get callbacks + // b) process all in order such that different + // c) finish with the last offset for each topic partition + + // First request will fail, requiring retries + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code()))); + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); + + MockCommitCallback cb = new MockCommitCallback(); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, cb, time.milliseconds()); + coordinator.commitOffsets(Collections.singletonMap(tp2, 101L), false, cb, time.milliseconds()); + coordinator.commitOffsets(Collections.singletonMap(tp, 102L), false, cb, time.milliseconds()); + + assertEquals(1, client.poll(0, time.milliseconds()).size()); + assertEquals(1, cb.invoked); + assertEquals(100L, (long) subscriptions.committed(tp)); + + client.respond(offsetCommitResponse(Collections.singletonMap(tp2, Errors.NONE.code()))); + assertEquals(1, client.poll(0, time.milliseconds()).size()); + assertEquals(2, cb.invoked); + assertEquals(101L, (long) subscriptions.committed(tp2)); + + client.respond(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); + assertEquals(1, client.poll(0, time.milliseconds()).size()); + assertEquals(3, cb.invoked); + assertEquals(102L, (long) subscriptions.committed(tp)); + } + + @Test public void testFetchOffset() { createCoordinator(); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); -- 2.3.5 From 397280fba68aea4106179968d0c3cc5fb10e46d8 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Fri, 1 May 2015 19:11:41 -0700 Subject: [PATCH 3/6] KAFKA-2123: Add scheduler for delayed tasks in new consumer, add backoff for commit retries, and simplify auto commit by using delayed tasks. --- .../kafka/clients/consumer/KafkaConsumer.java | 47 +++++++----- .../clients/consumer/internals/Coordinator.java | 14 +++- .../clients/consumer/internals/DelayedTask.java | 19 +++++ .../consumer/internals/DelayedTaskScheduler.java | 83 ++++++++++++++++++++++ .../consumer/internals/CoordinatorTest.java | 10 +++ .../internals/DelayedTaskSchedulerTest.java | 69 ++++++++++++++++++ .../scala/integration/kafka/api/ConsumerTest.scala | 3 +- 7 files changed, 225 insertions(+), 20 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskScheduler.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskSchedulerTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 02327b8..0eb1709 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -29,9 +29,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; -import org.apache.kafka.clients.consumer.internals.Coordinator; -import org.apache.kafka.clients.consumer.internals.Fetcher; -import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.clients.consumer.internals.*; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -357,11 +355,11 @@ public class KafkaConsumer implements Consumer { private final Metrics metrics; private final SubscriptionState subscriptions; private final Metadata metadata; + private final DelayedTaskScheduler scheduler; private final long retryBackoffMs; private final boolean autoCommit; private final long autoCommitIntervalMs; private final ConsumerRebalanceCallback rebalanceCallback; - private long lastCommitAttemptMs; private boolean closed = false; /** @@ -451,7 +449,6 @@ public class KafkaConsumer implements Consumer { this.time = new SystemTime(); this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); - this.lastCommitAttemptMs = time.milliseconds(); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), @@ -468,6 +465,7 @@ public class KafkaConsumer implements Consumer { this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG)); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), 0); + this.scheduler = new DelayedTaskScheduler(); String metricGrpPrefix = "consumer"; Map metricsTags = new LinkedHashMap(); @@ -487,6 +485,7 @@ public class KafkaConsumer implements Consumer { config.getInt(ConsumerConfig.COMMIT_RETRIES_CONFIG), config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), this.metadata, + this.scheduler, this.subscriptions, metrics, metricGrpPrefix, @@ -525,6 +524,10 @@ public class KafkaConsumer implements Consumer { config.logUnused(); + if (autoCommit) { + scheduler.schedule(new AutocommitTask(), time.milliseconds() + autoCommitIntervalMs); + } + log.debug("Kafka consumer created"); } catch (Throwable t) { // call close methods if internal objects are already constructed @@ -651,16 +654,21 @@ public class KafkaConsumer implements Consumer { if (!subscriptions.hasAllFetchPositions()) updateFetchPositions(this.subscriptions.missingFetchPositions(), now); - // maybe autocommit position - if (shouldAutoCommit(now)) - commit(CommitType.ASYNC); - /* * initiate any needed fetches, then block for the timeout the user specified */ Cluster cluster = this.metadata.fetch(); fetcher.initFetches(cluster, now); - client.poll(timeout, now); + + long timeoutEnd = now + timeout; + do { + scheduler.poll(now); + long untilTimeout = timeoutEnd - now; + long untilNextTask = scheduler.nextTimeout(now); + long timeoutMs = Math.min(untilTimeout, untilNextTask); + client.poll(timeoutMs, now); + now = time.milliseconds(); + } while (now < timeoutEnd); /* * initiate a fetch request for any nodes that we just got a response from without blocking @@ -693,7 +701,6 @@ public class KafkaConsumer implements Consumer { log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets); long now = time.milliseconds(); - this.lastCommitAttemptMs = now; // commit the offsets with the coordinator boolean syncCommit = commitType.equals(CommitType.SYNC); @@ -888,11 +895,6 @@ public class KafkaConsumer implements Consumer { } } - - private boolean shouldAutoCommit(long now) { - return this.autoCommit && this.lastCommitAttemptMs <= now - this.autoCommitIntervalMs; - } - /** * Request a metadata update and wait until it has occurred */ @@ -900,7 +902,9 @@ public class KafkaConsumer implements Consumer { int version = this.metadata.requestUpdate(); do { long now = time.milliseconds(); - this.client.poll(this.retryBackoffMs, now); + this.scheduler.poll(now); + long timeoutMs = Math.min(this.retryBackoffMs, this.scheduler.nextTimeout(now)); + this.client.poll(timeoutMs, now); } while (this.metadata.version() == version); } @@ -985,4 +989,13 @@ public class KafkaConsumer implements Consumer { if (this.closed) throw new IllegalStateException("This consumer has already been closed."); } + + + private class AutocommitTask implements DelayedTask { + @Override + public void run(long now) { + commit(CommitType.ASYNC); + scheduler.schedule(this, now + autoCommitIntervalMs); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index db31e1b..da5fc0f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -70,6 +70,7 @@ public final class Coordinator { private final long sessionTimeoutMs; private final String assignmentStrategy; private final SubscriptionState subscriptions; + private final DelayedTaskScheduler scheduler; private final CoordinatorMetrics sensors; private final long retryBackoffMs; private final int commitRetries; @@ -88,6 +89,7 @@ public final class Coordinator { int commitRetries, String assignmentStrategy, Metadata metadata, + DelayedTaskScheduler scheduler, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, @@ -100,6 +102,7 @@ public final class Coordinator { this.consumerId = ""; this.groupId = groupId; this.metadata = metadata; + this.scheduler = scheduler; this.consumerCoordinator = null; this.subscriptions = subscriptions; this.retryBackoffMs = retryBackoffMs; @@ -499,6 +502,7 @@ public final class Coordinator { } public void poll() { + scheduler.poll(time.milliseconds()); client.completeAll(clientRequest.request().destination()); if (completed && error != null) { throw error.exception(); @@ -542,8 +546,14 @@ public final class Coordinator { if ((commitRetries < 0 || attempts < commitRetries) && error.exception() instanceof RetriableException) { attempts++; error = null; - clientRequest = initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, request.toStruct(), this, time.milliseconds()); - client.send(clientRequest); + final long now = time.milliseconds(); + scheduler.schedule(new DelayedTask() { + @Override + public void run(long now) { + clientRequest = initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, request.toStruct(), CommitOffsetHandler.this, time.milliseconds()); + client.send(clientRequest); + } + }, now + retryBackoffMs); } else { if (callback != null) { callback.onCommitCompleted(error.exception()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java new file mode 100644 index 0000000..fd749da --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + + +public interface DelayedTask { + void run(long now); +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskScheduler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskScheduler.java new file mode 100644 index 0000000..c15dc1d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskScheduler.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import java.util.PriorityQueue; + +/** + * Tracks a set of tasks to be executed after a delay. + */ +public class DelayedTaskScheduler { + + private PriorityQueue tasks; + + public DelayedTaskScheduler() { + tasks = new PriorityQueue(); + } + + /** + * Schedule a task for execution in the future. + * + * @param task the task to execute + * @param at the time at which to + */ + public void schedule(DelayedTask task, long at) { + tasks.add(new Entry(task, at)); + } + + /** + * Get amount of time in milliseconds until the next event. Returns Long.MAX_VALUE if no tasks are scheduled. + * + * @return the remaining time in milliseconds + */ + public long nextTimeout(long now) { + if (tasks.isEmpty()) + return Long.MAX_VALUE; + else + return Math.max(tasks.peek().timeout - now, 0); + } + + /** + * Run any ready tasks. + * + * @param now the current time + */ + public void poll(long now) { + while (!tasks.isEmpty() && tasks.peek().timeout <= now) { + Entry entry = tasks.poll(); + entry.task.run(now); + } + } + + + private static class Entry implements Comparable { + DelayedTask task; + long timeout; + + public Entry(DelayedTask task, long timeout) { + this.task = task; + this.timeout = timeout; + } + + @Override + public int compareTo(Entry entry) { + if (this.timeout < entry.timeout) + return -1; + else if (this.timeout > entry.timeout) + return 1; + else + return 0; + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java index b56b758..08c8666 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java @@ -66,6 +66,7 @@ public class CoordinatorTest { private SubscriptionState subscriptions = new SubscriptionState(); private Metrics metrics = new Metrics(time); private Map metricTags = new LinkedHashMap(); + private DelayedTaskScheduler scheduler = new DelayedTaskScheduler(); private Coordinator coordinator; @@ -342,8 +343,16 @@ public class CoordinatorTest { coordinator.commitOffsets(Collections.singletonMap(tp2, 101L), false, cb, time.milliseconds()); coordinator.commitOffsets(Collections.singletonMap(tp, 102L), false, cb, time.milliseconds()); + // Let the first failure occur but don't + assertEquals(1, client.poll(0, time.milliseconds()).size()); + assertEquals(0, cb.invoked); + assertEquals(retryBackoffMs, scheduler.nextTimeout(time.milliseconds())); + + time.sleep(retryBackoffMs); + scheduler.poll(time.milliseconds()); assertEquals(1, client.poll(0, time.milliseconds()).size()); assertEquals(1, cb.invoked); + assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(time.milliseconds())); assertEquals(100L, (long) subscriptions.committed(tp)); client.respond(offsetCommitResponse(Collections.singletonMap(tp2, Errors.NONE.code()))); @@ -426,6 +435,7 @@ public class CoordinatorTest { retries, rebalanceStrategy, metadata, + scheduler, subscriptions, metrics, "consumer" + groupId, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskSchedulerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskSchedulerTest.java new file mode 100644 index 0000000..0617e8f --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskSchedulerTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; + +public class DelayedTaskSchedulerTest { + + private DelayedTaskScheduler scheduler = new DelayedTaskScheduler(); + private ArrayList executed = new ArrayList(); + + @Test + public void testScheduling() { + // Empty scheduler + assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(0)); + scheduler.poll(0); + assertEquals(Collections.emptyList(), executed); + + TestTask task1 = new TestTask(); + TestTask task2 = new TestTask(); + TestTask task3 = new TestTask(); + scheduler.schedule(task1, 20); + assertEquals(20, scheduler.nextTimeout(0)); + scheduler.schedule(task2, 10); + assertEquals(10, scheduler.nextTimeout(0)); + scheduler.schedule(task3, 30); + assertEquals(10, scheduler.nextTimeout(0)); + + scheduler.poll(5); + assertEquals(Collections.emptyList(), executed); + assertEquals(5, scheduler.nextTimeout(5)); + + scheduler.poll(10); + assertEquals(Arrays.asList(task2), executed); + assertEquals(10, scheduler.nextTimeout(10)); + + scheduler.poll(20); + assertEquals(Arrays.asList(task2, task1), executed); + assertEquals(20, scheduler.nextTimeout(10)); + + scheduler.poll(30); + assertEquals(Arrays.asList(task2, task1, task3), executed); + assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(30)); + } + + private class TestTask implements DelayedTask { + @Override + public void run(long now) { + executed.add(this); + } + } +} diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 3b1756f..b844120 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -50,7 +50,8 @@ class ConsumerTest extends IntegrationTestHarness with Logging { this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + override def setUp() { super.setUp() -- 2.3.5 From f3258363b50444bf548b12d1bd66fecc45198a4d Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Sat, 2 May 2015 22:03:22 -0700 Subject: [PATCH 4/6] KAFKA-2123: Make synchronous offset commits wait for previous requests to finish in order. --- .../kafka/clients/consumer/internals/Coordinator.java | 4 +++- .../clients/consumer/internals/CoordinatorTest.java | 18 +++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index da5fc0f..fc84867 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -184,7 +184,9 @@ public final class Coordinator { handler.send(now); if (blocking) { while (!handler.isComplete()) { - handler.poll(); + // Must process the head of the queue since there may be other offset commit requests that need + // to be processed first. + commitOffsetRequests.peek().poll(); } } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java index 08c8666..b5726fc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java @@ -343,7 +343,7 @@ public class CoordinatorTest { coordinator.commitOffsets(Collections.singletonMap(tp2, 101L), false, cb, time.milliseconds()); coordinator.commitOffsets(Collections.singletonMap(tp, 102L), false, cb, time.milliseconds()); - // Let the first failure occur but don't + // Let the first failure occur but don't process the retry until the timeout assertEquals(1, client.poll(0, time.milliseconds()).size()); assertEquals(0, cb.invoked); assertEquals(retryBackoffMs, scheduler.nextTimeout(time.milliseconds())); @@ -367,6 +367,22 @@ public class CoordinatorTest { } @Test + public void testMultipleCommitsFinalSync() { + // Tests that stacking multiple offset commits where the final one is synchronous properly processes all of them + createCoordinator(); + + MockCommitCallback cb = new MockCommitCallback(); + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); + + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, cb, time.milliseconds()); + coordinator.commitOffsets(Collections.singletonMap(tp, 101L), true, cb, time.milliseconds()); + assertEquals(2, cb.invoked); + assertNull(cb.exception); + } + + @Test public void testFetchOffset() { createCoordinator(); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); -- 2.3.5 From eabd6d8c7aa508f99f7df0d0fca098646039329d Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 4 May 2015 09:27:08 -0700 Subject: [PATCH 5/6] KAFKA-2123: Remove redundant calls to ensureNotClosed --- .../src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 0eb1709..6cbaf07 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -740,7 +740,6 @@ public class KafkaConsumer implements Consumer { */ @Override public synchronized void commit(CommitType commitType, ConsumerCommitCallback callback) { - ensureNotClosed(); commit(this.subscriptions.allConsumed(), commitType, callback); } @@ -755,7 +754,6 @@ public class KafkaConsumer implements Consumer { */ @Override public synchronized void commit(CommitType commitType) { - ensureNotClosed(); commit(this.subscriptions.allConsumed(), commitType, null); } -- 2.3.5 From 3dd45e5df4e778a3cd7214e886e367c2143a9a3a Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 4 May 2015 22:22:30 -0700 Subject: [PATCH 6/6] KAFKA-2123: Address review comments. --- .../kafka/clients/consumer/KafkaConsumer.java | 24 ++++--- .../clients/consumer/internals/Coordinator.java | 18 +++-- .../consumer/internals/DelayedTaskQueue.java | 83 ++++++++++++++++++++++ .../consumer/internals/DelayedTaskScheduler.java | 83 ---------------------- .../consumer/internals/CoordinatorTest.java | 2 +- .../consumer/internals/DelayedTaskQueueTest.java | 69 ++++++++++++++++++ .../internals/DelayedTaskSchedulerTest.java | 69 ------------------ 7 files changed, 179 insertions(+), 169 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java delete mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskScheduler.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java delete mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskSchedulerTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 6cbaf07..a9c6b88 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -29,7 +29,11 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; -import org.apache.kafka.clients.consumer.internals.*; +import org.apache.kafka.clients.consumer.internals.Coordinator; +import org.apache.kafka.clients.consumer.internals.DelayedTask; +import org.apache.kafka.clients.consumer.internals.DelayedTaskQueue; +import org.apache.kafka.clients.consumer.internals.Fetcher; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -355,7 +359,7 @@ public class KafkaConsumer implements Consumer { private final Metrics metrics; private final SubscriptionState subscriptions; private final Metadata metadata; - private final DelayedTaskScheduler scheduler; + private final DelayedTaskQueue delayedTasks; private final long retryBackoffMs; private final boolean autoCommit; private final long autoCommitIntervalMs; @@ -465,7 +469,7 @@ public class KafkaConsumer implements Consumer { this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG)); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), 0); - this.scheduler = new DelayedTaskScheduler(); + this.delayedTasks = new DelayedTaskQueue(); String metricGrpPrefix = "consumer"; Map metricsTags = new LinkedHashMap(); @@ -485,7 +489,7 @@ public class KafkaConsumer implements Consumer { config.getInt(ConsumerConfig.COMMIT_RETRIES_CONFIG), config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), this.metadata, - this.scheduler, + this.delayedTasks, this.subscriptions, metrics, metricGrpPrefix, @@ -525,7 +529,7 @@ public class KafkaConsumer implements Consumer { config.logUnused(); if (autoCommit) { - scheduler.schedule(new AutocommitTask(), time.milliseconds() + autoCommitIntervalMs); + delayedTasks.add(new AutocommitTask(), time.milliseconds() + autoCommitIntervalMs); } log.debug("Kafka consumer created"); @@ -662,9 +666,9 @@ public class KafkaConsumer implements Consumer { long timeoutEnd = now + timeout; do { - scheduler.poll(now); + delayedTasks.poll(now); long untilTimeout = timeoutEnd - now; - long untilNextTask = scheduler.nextTimeout(now); + long untilNextTask = delayedTasks.nextTimeout(now); long timeoutMs = Math.min(untilTimeout, untilNextTask); client.poll(timeoutMs, now); now = time.milliseconds(); @@ -900,8 +904,8 @@ public class KafkaConsumer implements Consumer { int version = this.metadata.requestUpdate(); do { long now = time.milliseconds(); - this.scheduler.poll(now); - long timeoutMs = Math.min(this.retryBackoffMs, this.scheduler.nextTimeout(now)); + this.delayedTasks.poll(now); + long timeoutMs = Math.min(this.retryBackoffMs, this.delayedTasks.nextTimeout(now)); this.client.poll(timeoutMs, now); } while (this.metadata.version() == version); } @@ -993,7 +997,7 @@ public class KafkaConsumer implements Consumer { @Override public void run(long now) { commit(CommitType.ASYNC); - scheduler.schedule(this, now + autoCommitIntervalMs); + delayedTasks.add(this, now + autoCommitIntervalMs); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index fc84867..0fed27f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -51,7 +51,13 @@ import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -70,7 +76,7 @@ public final class Coordinator { private final long sessionTimeoutMs; private final String assignmentStrategy; private final SubscriptionState subscriptions; - private final DelayedTaskScheduler scheduler; + private final DelayedTaskQueue delayedTasks; private final CoordinatorMetrics sensors; private final long retryBackoffMs; private final int commitRetries; @@ -89,7 +95,7 @@ public final class Coordinator { int commitRetries, String assignmentStrategy, Metadata metadata, - DelayedTaskScheduler scheduler, + DelayedTaskQueue delayedTasks, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, @@ -102,7 +108,7 @@ public final class Coordinator { this.consumerId = ""; this.groupId = groupId; this.metadata = metadata; - this.scheduler = scheduler; + this.delayedTasks = delayedTasks; this.consumerCoordinator = null; this.subscriptions = subscriptions; this.retryBackoffMs = retryBackoffMs; @@ -504,7 +510,7 @@ public final class Coordinator { } public void poll() { - scheduler.poll(time.milliseconds()); + delayedTasks.poll(time.milliseconds()); client.completeAll(clientRequest.request().destination()); if (completed && error != null) { throw error.exception(); @@ -549,7 +555,7 @@ public final class Coordinator { attempts++; error = null; final long now = time.milliseconds(); - scheduler.schedule(new DelayedTask() { + delayedTasks.add(new DelayedTask() { @Override public void run(long now) { clientRequest = initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, request.toStruct(), CommitOffsetHandler.this, time.milliseconds()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java new file mode 100644 index 0000000..6d830c7 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import java.util.PriorityQueue; + +/** + * Tracks a set of tasks to be executed after a delay. + */ +public class DelayedTaskQueue { + + private PriorityQueue tasks; + + public DelayedTaskQueue() { + tasks = new PriorityQueue(); + } + + /** + * Schedule a task for execution in the future. + * + * @param task the task to execute + * @param at the time at which to + */ + public void add(DelayedTask task, long at) { + tasks.add(new Entry(task, at)); + } + + /** + * Get amount of time in milliseconds until the next event. Returns Long.MAX_VALUE if no tasks are scheduled. + * + * @return the remaining time in milliseconds + */ + public long nextTimeout(long now) { + if (tasks.isEmpty()) + return Long.MAX_VALUE; + else + return Math.max(tasks.peek().timeout - now, 0); + } + + /** + * Run any ready tasks. + * + * @param now the current time + */ + public void poll(long now) { + while (!tasks.isEmpty() && tasks.peek().timeout <= now) { + Entry entry = tasks.poll(); + entry.task.run(now); + } + } + + + private static class Entry implements Comparable { + DelayedTask task; + long timeout; + + public Entry(DelayedTask task, long timeout) { + this.task = task; + this.timeout = timeout; + } + + @Override + public int compareTo(Entry entry) { + if (this.timeout < entry.timeout) + return -1; + else if (this.timeout > entry.timeout) + return 1; + else + return 0; + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskScheduler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskScheduler.java deleted file mode 100644 index c15dc1d..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskScheduler.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package org.apache.kafka.clients.consumer.internals; - -import java.util.PriorityQueue; - -/** - * Tracks a set of tasks to be executed after a delay. - */ -public class DelayedTaskScheduler { - - private PriorityQueue tasks; - - public DelayedTaskScheduler() { - tasks = new PriorityQueue(); - } - - /** - * Schedule a task for execution in the future. - * - * @param task the task to execute - * @param at the time at which to - */ - public void schedule(DelayedTask task, long at) { - tasks.add(new Entry(task, at)); - } - - /** - * Get amount of time in milliseconds until the next event. Returns Long.MAX_VALUE if no tasks are scheduled. - * - * @return the remaining time in milliseconds - */ - public long nextTimeout(long now) { - if (tasks.isEmpty()) - return Long.MAX_VALUE; - else - return Math.max(tasks.peek().timeout - now, 0); - } - - /** - * Run any ready tasks. - * - * @param now the current time - */ - public void poll(long now) { - while (!tasks.isEmpty() && tasks.peek().timeout <= now) { - Entry entry = tasks.poll(); - entry.task.run(now); - } - } - - - private static class Entry implements Comparable { - DelayedTask task; - long timeout; - - public Entry(DelayedTask task, long timeout) { - this.task = task; - this.timeout = timeout; - } - - @Override - public int compareTo(Entry entry) { - if (this.timeout < entry.timeout) - return -1; - else if (this.timeout > entry.timeout) - return 1; - else - return 0; - } - } -} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java index b5726fc..fa8c52d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java @@ -66,7 +66,7 @@ public class CoordinatorTest { private SubscriptionState subscriptions = new SubscriptionState(); private Metrics metrics = new Metrics(time); private Map metricTags = new LinkedHashMap(); - private DelayedTaskScheduler scheduler = new DelayedTaskScheduler(); + private DelayedTaskQueue scheduler = new DelayedTaskQueue(); private Coordinator coordinator; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java new file mode 100644 index 0000000..b2121ae --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; + +public class DelayedTaskQueueTest { + + private DelayedTaskQueue scheduler = new DelayedTaskQueue(); + private ArrayList executed = new ArrayList(); + + @Test + public void testScheduling() { + // Empty scheduler + assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(0)); + scheduler.poll(0); + assertEquals(Collections.emptyList(), executed); + + TestTask task1 = new TestTask(); + TestTask task2 = new TestTask(); + TestTask task3 = new TestTask(); + scheduler.add(task1, 20); + assertEquals(20, scheduler.nextTimeout(0)); + scheduler.add(task2, 10); + assertEquals(10, scheduler.nextTimeout(0)); + scheduler.add(task3, 30); + assertEquals(10, scheduler.nextTimeout(0)); + + scheduler.poll(5); + assertEquals(Collections.emptyList(), executed); + assertEquals(5, scheduler.nextTimeout(5)); + + scheduler.poll(10); + assertEquals(Arrays.asList(task2), executed); + assertEquals(10, scheduler.nextTimeout(10)); + + scheduler.poll(20); + assertEquals(Arrays.asList(task2, task1), executed); + assertEquals(20, scheduler.nextTimeout(10)); + + scheduler.poll(30); + assertEquals(Arrays.asList(task2, task1, task3), executed); + assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(30)); + } + + private class TestTask implements DelayedTask { + @Override + public void run(long now) { + executed.add(this); + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskSchedulerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskSchedulerTest.java deleted file mode 100644 index 0617e8f..0000000 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskSchedulerTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package org.apache.kafka.clients.consumer.internals; - -import static org.junit.Assert.assertEquals; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; - -public class DelayedTaskSchedulerTest { - - private DelayedTaskScheduler scheduler = new DelayedTaskScheduler(); - private ArrayList executed = new ArrayList(); - - @Test - public void testScheduling() { - // Empty scheduler - assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(0)); - scheduler.poll(0); - assertEquals(Collections.emptyList(), executed); - - TestTask task1 = new TestTask(); - TestTask task2 = new TestTask(); - TestTask task3 = new TestTask(); - scheduler.schedule(task1, 20); - assertEquals(20, scheduler.nextTimeout(0)); - scheduler.schedule(task2, 10); - assertEquals(10, scheduler.nextTimeout(0)); - scheduler.schedule(task3, 30); - assertEquals(10, scheduler.nextTimeout(0)); - - scheduler.poll(5); - assertEquals(Collections.emptyList(), executed); - assertEquals(5, scheduler.nextTimeout(5)); - - scheduler.poll(10); - assertEquals(Arrays.asList(task2), executed); - assertEquals(10, scheduler.nextTimeout(10)); - - scheduler.poll(20); - assertEquals(Arrays.asList(task2, task1), executed); - assertEquals(20, scheduler.nextTimeout(10)); - - scheduler.poll(30); - assertEquals(Arrays.asList(task2, task1, task3), executed); - assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(30)); - } - - private class TestTask implements DelayedTask { - @Override - public void run(long now) { - executed.add(this); - } - } -} -- 2.3.5