From a879de30133482c20d87f426ce5b1429dde4b404 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Tue, 14 Apr 2015 15:13:17 -0700 Subject: [PATCH] WIP KAFKA-2123: Make new consumer offset commit API use a callback and return a Future. --- .../java/org/apache/kafka/clients/KafkaClient.java | 10 ++ .../org/apache/kafka/clients/NetworkClient.java | 19 ++++ .../apache/kafka/clients/consumer/CommitType.java | 17 ---- .../apache/kafka/clients/consumer/Consumer.java | 19 +++- .../clients/consumer/ConsumerCommitCallback.java | 34 +++++++ .../kafka/clients/consumer/KafkaConsumer.java | 45 +++++---- .../kafka/clients/consumer/MockConsumer.java | 51 +++++++++- .../clients/consumer/internals/CommitFuture.java | 87 +++++++++++++++++ .../clients/consumer/internals/Coordinator.java | 105 ++++++++++++++------- .../consumer/internals/SubscriptionState.java | 2 +- .../java/org/apache/kafka/clients/MockClient.java | 58 +++++++++--- .../kafka/clients/consumer/MockConsumerTest.java | 2 +- .../consumer/internals/CoordinatorTest.java | 68 ++++++++++--- .../clients/producer/internals/SenderTest.java | 5 +- .../integration/kafka/api/ConsumerBounceTest.scala | 5 +- .../scala/integration/kafka/api/ConsumerTest.scala | 5 +- 16 files changed, 423 insertions(+), 109 deletions(-) delete mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitFuture.java diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 96ac6d0..bdfe24f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -80,6 +80,16 @@ public interface KafkaClient { public List poll(long timeout, long now); /** + * Poll a specific node. + * + * @param node The node to complete requests for + * @param timeout The maximum amount of time to wait for responses in ms + * @param now The current time in ms + * @return All requests that complete during this time period. + */ + public List poll(int node, long timeout, long now); + + /** * Complete all in-flight requests for a given node * * @param node The node to complete requests for diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index b7ae595..e6ccea6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -245,6 +245,25 @@ public class NetworkClient implements KafkaClient { } /** + * Await all the outstanding responses for requests on the given connection, waiting at most timeout milliseconds. + * + * @param node The node to poll + * @param timeout The maximum amount of time to wait for responses in ms + * @param now The current time in ms + * @return The list of responses + */ + @Override + public List poll(int node, long timeout, long now) { + try { + this.selector.muteAll(); + this.selector.unmute(node); + return poll(timeout, now); + } finally { + this.selector.unmuteAll(); + } + } + + /** * Await all the outstanding responses for requests on the given connection * * @param node The node to block on diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java deleted file mode 100644 index 7548a9b..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java +++ /dev/null @@ -1,17 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.clients.consumer; - -public enum CommitType { - SYNC, ASYNC -} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 8f587bc..d464611 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -16,6 +16,7 @@ import java.io.Closeable; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Future; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; @@ -59,14 +60,24 @@ public interface Consumer extends Closeable { public ConsumerRecords poll(long timeout); /** - * @see KafkaConsumer#commit(CommitType) + * @see KafkaConsumer#commit() */ - public void commit(CommitType commitType); + public Future commit(); /** - * @see KafkaConsumer#commit(Map, CommitType) + * @see KafkaConsumer#commit(Map) */ - public void commit(Map offsets, CommitType commitType); + public Future commit(Map offsets); + + /** + * @see KafkaConsumer#commit(ConsumerCommitCallback) + */ + public Future commit(ConsumerCommitCallback callback); + + /** + * @see KafkaConsumer#commit(Map, ConsumerCommitCallback) + */ + public Future commit(Map offsets, ConsumerCommitCallback callback); /** * @see KafkaConsumer#seek(TopicPartition, long) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java new file mode 100644 index 0000000..21c966b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +/** + * A callback interface that the user can implement to allow code to execute when the commit request is complete. + * + * This callback will execute in the user thread as part of the {@link Consumer#poll(long) poll(long)} call whenever partition assignment changes. + * + */ +public interface ConsumerCommitCallback { + + /** + * A callback method the user can implement to provide asynchronous handling of commit request completion. + * This method will be called when the commit request sent to the server has been acknowledged. + * + * @param exception The exception thrown during processing of this request. Null if no error occurred. + */ + public void onCompletion(Exception exception); +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 2124334..1b397b4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -71,10 +72,10 @@ import org.slf4j.LoggerFactory; * out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances * every time the consumer receives data calls {@link #poll(long)} and receives messages. *

- * The {@link #commit(CommitType) committed position} is the last offset that has been saved securely. Should the + * The {@link #commit() committed position} is the last offset that has been saved securely. Should the * process fail and restart, this is the offset that it will recover to. The consumer can either automatically commit * offsets periodically, or it can choose to control this committed position manually by calling - * {@link #commit(CommitType) commit}. + * {@link #commit() commit}. *

* This distinction gives the consumer control over when a record is considered consumed. It is discussed in further * detail below. @@ -188,7 +189,7 @@ import org.slf4j.LoggerFactory; * buffer.add(record); * if (buffer.size() >= commitInterval) { * insertIntoDb(buffer); - * consumer.commit(CommitType.SYNC); + * consumer.commit().get(); * buffer.clear(); * } * } @@ -594,7 +595,7 @@ public class KafkaConsumer implements Consumer { * The offset used for fetching the data is governed by whether or not {@link #seek(TopicPartition, long)} is used. * If {@link #seek(TopicPartition, long)} is used, it will use the specified offsets on startup and on every * rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed - * offset using {@link #commit(Map, CommitType) commit(offsets, sync)} for the subscribed list of partitions. + * offset using {@link #commit(Map) commit(offsets)} for the subscribed list of partitions. * * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits * indefinitely. Must not be negative @@ -625,7 +626,7 @@ public class KafkaConsumer implements Consumer { // maybe autocommit position if (shouldAutoCommit(now)) - commit(CommitType.ASYNC); + commit(); /* * initiate any needed fetches, then block for the timeout the user specified @@ -655,21 +656,22 @@ public class KafkaConsumer implements Consumer { * the commit succeeds. * * @param offsets The list of offsets per partition that should be committed to Kafka. - * @param commitType Control whether the commit is blocking + * @param callback A user-supplied callback to execute when the commit request has been acknowledged by the server + * (null indicates no callback) + * @return a Future for this request. Invoking {@link java.util.concurrent.Future#get() get()} will always return null + * because the commit has {@link Void} return type, but will throw an exception in case of error + * */ @Override - public synchronized void commit(final Map offsets, CommitType commitType) { + public synchronized Future commit(final Map offsets, ConsumerCommitCallback callback) { ensureNotClosed(); - log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets); + log.debug("Committing offsets: {} ", offsets); long now = time.milliseconds(); this.lastCommitAttemptMs = now; // commit the offsets with the coordinator - boolean syncCommit = commitType.equals(CommitType.SYNC); - if (!syncCommit) - this.subscriptions.needRefreshCommits(); - coordinator.commitOffsets(offsets, syncCommit, now); + return coordinator.commitOffsets(offsets, callback, now); } /** @@ -678,13 +680,24 @@ public class KafkaConsumer implements Consumer { * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API * should not be used. - * - * @param commitType Whether or not the commit should block until it is acknowledged. + * + * @param callback A user-supplied callback to execute when the commit request has been acknowledged by the server + * (null indicates no callback) */ @Override - public synchronized void commit(CommitType commitType) { + public synchronized Future commit(ConsumerCommitCallback callback) { ensureNotClosed(); - commit(this.subscriptions.allConsumed(), commitType); + return commit(this.subscriptions.allConsumed(), callback); + } + + @Override + public synchronized Future commit(final Map offsets) { + return commit(offsets, null); + } + + @Override + public synchronized Future commit() { + return commit((ConsumerCommitCallback) null); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index f50da82..45f6e40 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -19,6 +19,10 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Metric; @@ -106,16 +110,28 @@ public class MockConsumer implements Consumer { } @Override - public synchronized void commit(Map offsets, CommitType commitType) { + public synchronized Future commit(Map offsets, ConsumerCommitCallback callback) { ensureNotClosed(); for (Entry entry : offsets.entrySet()) subscriptions.committed(entry.getKey(), entry.getValue()); + return new CommitFuture(); } @Override - public synchronized void commit(CommitType commitType) { + public synchronized Future commit(ConsumerCommitCallback callback) { ensureNotClosed(); - commit(this.subscriptions.allConsumed(), commitType); + return commit(this.subscriptions.allConsumed(), callback); + } + + @Override + public synchronized Future commit(Map offsets) { + return commit(offsets, null); + } + + @Override + public synchronized Future commit() { + ensureNotClosed(); + return commit(this.subscriptions.allConsumed(), null); } @Override @@ -179,4 +195,33 @@ public class MockConsumer implements Consumer { if (this.closed) throw new IllegalStateException("This consumer has already been closed."); } + + + // Since the MockConsumer currently handles all commits synchronously, this Future is always in a completed state + private static class CommitFuture implements Future { + @Override + public boolean cancel(boolean b) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + return null; + } + + @Override + public Void get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { + return null; + } + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitFuture.java new file mode 100644 index 0000000..a8cb0f7 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitFuture.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.KafkaException; + +import java.util.concurrent.*; + +/** + * The Future of a consumer commit request + */ +public class CommitFuture implements Future { + private final Coordinator.CommitFutureResolver resolver; + private boolean finished = false; + private KafkaException exception; + + public CommitFuture(Coordinator.CommitFutureResolver resolver) { + this.resolver = resolver; + this.finished = resolver == null; + this.exception = null; + } + + @Override + public boolean cancel(boolean b) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public synchronized boolean isDone() { + return finished; + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + while (!isDone()) { + try { + get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // Ignore since this could be a timeout from get(), or it could be the result of the future. The loop + // guard + return statement will ensure we sort it out properly + } + } + return result(); + } + + @Override + public Void get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { + if (isDone()) { + return result(); + } + resolver.resolve(l, timeUnit); + return result(); + } + + public synchronized void complete(KafkaException e) { + this.exception = e; + finished = true; + } + + private Void result() { + if (exception != null) { + throw exception; + } + return null; + } +} + diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index e55ab11..f9d5448 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -17,10 +17,12 @@ import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.clients.consumer.ConsumerCommitCallback; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -54,7 +56,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * This class manage the coordination process with the consumer coordinator. @@ -148,15 +152,15 @@ public final class Coordinator { /** * Commit offsets for the specified list of topics and partitions. * - * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails. + * A non-blocking commit will attempt to commit offsets asynchronously. No error will be thrown if the commit fails. * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until * the commit succeeds. * * @param offsets The list of offsets per partition that should be committed. - * @param blocking Control whether the commit is blocking + * @param callback User-specified commit callback * @param now The current time */ - public void commitOffsets(final Map offsets, boolean blocking, long now) { + public Future commitOffsets(final Map offsets, ConsumerCommitCallback callback, long now) { if (!offsets.isEmpty()) { // create the offset commit request Map offsetData; @@ -170,29 +174,14 @@ public final class Coordinator { offsetData); // send request and possibly wait for response if it is blocking - RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets); - - if (blocking) { - boolean done; - do { - ClientResponse response = blockingCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now); - - // check for errors - done = true; - OffsetCommitResponse commitResponse = new OffsetCommitResponse(response.responseBody()); - for (short errorCode : commitResponse.responseData().values()) { - if (errorCode != Errors.NONE.code()) - done = false; - } - if (!done) { - log.debug("Error in offset commit, backing off for {} ms before retrying again.", - this.retryBackoffMs); - Utils.sleep(this.retryBackoffMs); - } - } while (!done); - } else { - this.client.send(initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now)); - } + CommitOffsetCompletionHandler handler = new CommitOffsetCompletionHandler(offsets, callback); + final ClientRequest clientRequest = initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now); + // The coordinator isn't guaranteed to be valid until after initiateCorodinatorRequest, which invokes ensureCoordinatorReady. + handler.setDestination(clientRequest.request().destination()); + this.client.send(clientRequest); + return handler.getFuture(); + } else { + return new CommitFuture(null); } } @@ -483,20 +472,42 @@ public final class Coordinator { } } - private class CommitOffsetCompletionHandler implements RequestCompletionHandler { + public interface CommitFutureResolver { + public void resolve(long l, TimeUnit timeUnit) throws TimeoutException; + } + + private class CommitOffsetCompletionHandler implements RequestCompletionHandler, CommitFutureResolver { + private int destination; private final Map offsets; + private final ConsumerCommitCallback callback; + private final CommitFuture future; - public CommitOffsetCompletionHandler(Map offsets) { + public CommitOffsetCompletionHandler(Map offsets, ConsumerCommitCallback callback) { this.offsets = offsets; + this.callback = callback; + this.future = new CommitFuture(this); + } + + public void setDestination(int destination) { + this.destination = destination; + } + + public CommitFuture getFuture() { + return future; } @Override public void onComplete(ClientResponse resp) { if (resp.wasDisconnected()) { handleCoordinatorDisconnect(resp); + ApiException exception = Errors.NETWORK_EXCEPTION.exception(); + if (callback != null) + callback.onCompletion(exception); + future.complete(exception); } else { - OffsetCommitResponse response = new OffsetCommitResponse(resp.responseBody()); + final OffsetCommitResponse response = new OffsetCommitResponse(resp.responseBody()); + ApiException firstException = null; for (Map.Entry entry : response.responseData().entrySet()) { TopicPartition tp = entry.getKey(); short errorCode = entry.getValue(); @@ -504,19 +515,41 @@ public final class Coordinator { if (errorCode == Errors.NONE.code()) { log.debug("Committed offset {} for partition {}", offset, tp); subscriptions.committed(tp, offset); - } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() - || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { - coordinatorDead(); } else { - log.error("Error committing partition {} at offset {}: {}", - tp, - offset, - Errors.forCode(errorCode).exception().getMessage()); + firstException = Errors.forCode(errorCode).exception(); + if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() + || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { + coordinatorDead(); + } else { + log.error("Error committing partition {} at offset {}: {}", + tp, + offset, + Errors.forCode(errorCode).exception().getMessage()); + } } } + if (callback != null) + callback.onCompletion(firstException); + future.complete(firstException); } sensors.commitLatency.record(resp.requestLatencyMs()); } + + @Override + public void resolve(long l, TimeUnit timeUnit) throws TimeoutException { + // All the handling of results happens in the callbacks, this just needs to drive the client until + // the response is received. + long timeoutMs = timeUnit.convert(l, TimeUnit.MILLISECONDS); + long now = time.milliseconds(); + long timeout = now + timeoutMs; + while (client.inFlightRequestCount(destination) > 0 && now < timeout) { + client.poll(destination, timeout - now, now); + now = time.milliseconds(); + } + if (client.inFlightRequestCount(destination) > 0) { + throw new TimeoutException(); + } + } } private class CoordinatorMetrics { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index cee7541..c5df750 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -62,7 +62,7 @@ public class SubscriptionState { public void subscribe(String topic) { if (this.subscribedPartitions.size() > 0) - throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive"); + throw new IllegalStateException("Subscription to topics and partitions are mutually exclusive"); if (!this.subscribedTopics.contains(topic)) { this.subscribedTopics.add(topic); this.needsPartitionAssignment = true; diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 5e3fab1..58e2078 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -49,7 +49,9 @@ public class MockClient implements KafkaClient { private int correlation = 0; private Node node = null; private final Set ready = new HashSet(); + private final Set pendingDisconnects = new HashSet(); private final Queue requests = new ArrayDeque(); + private final Queue requestsWithResponses = new ArrayDeque(); private final Queue responses = new ArrayDeque(); private final Queue futureResponses = new ArrayDeque(); @@ -79,15 +81,30 @@ public class MockClient implements KafkaClient { } public void disconnect(Integer node) { - Iterator iter = requests.iterator(); - while (iter.hasNext()) { - ClientRequest request = iter.next(); - if (request.request().destination() == node) { - responses.add(new ClientResponse(request, time.milliseconds(), true, null)); - iter.remove(); + // To ensure correct in flight request counts, we can't adjust requests/responses until the next poll() + pendingDisconnects.add(node); + ready.remove(node); + } + + private void processDisconnects() { + for (Integer node : pendingDisconnects) { + for (ClientResponse response : responses) { + assert !response.wasDisconnected() && response.request().request().destination() == node : + "Disconnected nodes in MockClient should only have disconnect responses queued."; + } + + // And generate disconnect responses for any outstanding requests + Iterator iter = requests.iterator(); + while (iter.hasNext()) { + ClientRequest request = iter.next(); + if (request.request().destination() == node) { + iter.remove(); + requestsWithResponses.add(request); + responses.add(new ClientResponse(request, time.milliseconds(), true, null)); + } } } - ready.remove(node); + pendingDisconnects.clear(); } @Override @@ -96,17 +113,27 @@ public class MockClient implements KafkaClient { FutureResponse futureResp = futureResponses.poll(); ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody); responses.add(resp); + this.requestsWithResponses.add(request); } else { this.requests.add(request); } } @Override + public List poll(int node, long timeoutMs, long now) { + // Can't currently filter processing to specific nodes + return poll(timeoutMs, now); + } + + @Override public List poll(long timeoutMs, long now) { + processDisconnects(); + List copy = new ArrayList(this.responses); while (!this.responses.isEmpty()) { ClientResponse response = this.responses.poll(); + this.requestsWithResponses.remove(); if (response.request().hasCallback()) response.request().callback().onComplete(response); } @@ -122,13 +149,16 @@ public class MockClient implements KafkaClient { @Override public List completeAll(long now) { List responses = poll(0, now); - if (requests.size() > 0) + if (requests.size() > 0 || requestsWithResponses.size() > 0) throw new IllegalStateException("Requests without responses remain."); return responses; } - public Queue requests() { - return this.requests; + public ClientRequest peek() { + ClientRequest result = this.requestsWithResponses.peek(); + if (result == null) + result = this.requests.peek(); + return result; } public void respond(Struct body) { @@ -137,6 +167,7 @@ public class MockClient implements KafkaClient { public void respond(Struct body, boolean disconnected) { ClientRequest request = requests.remove(); + requestsWithResponses.add(request); responses.add(new ClientResponse(request, time.milliseconds(), disconnected, body)); } @@ -154,12 +185,15 @@ public class MockClient implements KafkaClient { @Override public int inFlightRequestCount() { - return requests.size(); + // Queued responses are included here because the requests they correspond to are technically still outstanding + // since poll() hasn't been called for them yet. + return requests.size() + requestsWithResponses.size(); } @Override public int inFlightRequestCount(int nodeId) { - return requests.size(); + // Doesn't support per-node counts yet + return inFlightRequestCount(); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index 677edd3..24c0489 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -41,7 +41,7 @@ public class MockConsumerTest { assertEquals(rec2, iter.next()); assertFalse(iter.hasNext()); assertEquals(1L, consumer.position(new TopicPartition("test", 0))); - consumer.commit(CommitType.SYNC); + consumer.commit(); assertEquals(1L, consumer.committed(new TopicPartition("test", 0))); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java index b06c4a7..d4760a7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java @@ -16,14 +16,13 @@ */ package org.apache.kafka.clients.consumer.internals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.consumer.ConsumerCommitCallback; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; @@ -43,6 +42,8 @@ import java.util.Map; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.*; + public class CoordinatorTest { @@ -176,50 +177,84 @@ public class CoordinatorTest { @Test - public void testCommitOffsetNormal() { + public void testCommitOffsetNormal() throws Exception { client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); // sync commit + CommitCallback callback = new CommitCallback(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds()); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), callback, time.milliseconds()).get(); + assertTrue(callback.invoked); + assertNull(callback.exception); // async commit - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds()); + callback = new CommitCallback(); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), callback, time.milliseconds()); client.respond(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); assertEquals(1, client.poll(0, time.milliseconds()).size()); + assertTrue(callback.invoked); + assertNull(callback.exception); } @Test - public void testCommitOffsetError() { + public void testCommitOffsetError() throws Exception { client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); // async commit with coordinator not available + CommitCallback callback = new CommitCallback(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()))); - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds()); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), callback, time.milliseconds()); assertEquals(1, client.poll(0, time.milliseconds()).size()); assertTrue(coordinator.coordinatorUnknown()); + assertTrue(callback.invoked); + assertNotNull(callback.exception); // resume client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); // async commit with not coordinator + callback = new CommitCallback(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code()))); - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds()); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), callback, time.milliseconds()); assertEquals(1, client.poll(0, time.milliseconds()).size()); assertTrue(coordinator.coordinatorUnknown()); + assertTrue(callback.invoked); + assertNotNull(callback.exception); // resume client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); // sync commit with not_coordinator + callback = new CommitCallback(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code()))); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds()); + try { + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), callback, time.milliseconds()).get(); + fail(); + } catch (ApiException e) { + } + assertTrue(callback.invoked); + assertNotNull(callback.exception); + callback = new CommitCallback(); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), callback, time.milliseconds()).get(); + assertTrue(callback.invoked); + assertNull(callback.exception); // sync commit with coordinator disconnected + callback = new CommitCallback(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds()); + try { + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), callback, time.milliseconds()).get(); + fail(); + } catch (ApiException e) { + } + assertTrue(callback.invoked); + assertNotNull(callback.exception); + callback = new CommitCallback(); + coordinator.commitOffsets(Collections.singletonMap(tp, 100L), callback, time.milliseconds()).get(); + assertTrue(callback.invoked); + assertNull(callback.exception); } @@ -281,4 +316,15 @@ public class CoordinatorTest { OffsetFetchResponse response = new OffsetFetchResponse(Collections.singletonMap(tp, data)); return response.toStruct(); } + + private static class CommitCallback implements ConsumerCommitCallback { + public boolean invoked = false; + public Exception exception = null; + + @Override + public void onCompletion(Exception exception) { + invoked = true; + this.exception = exception; + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 8b1805d..f101304 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -103,7 +103,8 @@ public class SenderTest { sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals(1, client.inFlightRequestCount()); - client.disconnect(client.requests().peek().request().destination()); + client.disconnect(client.peek().request().destination()); + sender.run(time.milliseconds()); assertEquals(0, client.inFlightRequestCount()); sender.run(time.milliseconds()); // receive error sender.run(time.milliseconds()); // reconnect @@ -119,7 +120,7 @@ public class SenderTest { future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; sender.run(time.milliseconds()); // send produce request for (int i = 0; i < maxRetries + 1; i++) { - client.disconnect(client.requests().peek().request().destination()); + client.disconnect(client.peek().request().destination()); sender.run(time.milliseconds()); // receive error sender.run(time.milliseconds()); // reconnect sender.run(time.milliseconds()); // resend diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 35f4f46..32dbe2e 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -19,7 +19,6 @@ import kafka.server.KafkaConfig import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.consumer.CommitType import org.apache.kafka.common.TopicPartition import kafka.utils.{ShutdownableThread, TestUtils, Logging} @@ -85,7 +84,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { assertEquals(consumed.toLong, record.offset()) consumed += 1 } - consumer.commit(CommitType.SYNC) + consumer.commit().get(); if (consumed == numRecords) { consumer.seekToBeginning() @@ -122,7 +121,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { assertEquals(pos, consumer.position(tp)) } else if (coin == 2) { info("Committing offset.") - consumer.commit(CommitType.SYNC) + consumer.commit().get() assertEquals(consumer.position(tp), consumer.committed(tp)) } } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index ffbdf5d..f1ed5fc 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -19,7 +19,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.consumer.CommitType import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.TopicPartition import org.apache.kafka.clients.consumer.NoOffsetForPartitionException @@ -122,12 +121,12 @@ class ConsumerTest extends IntegrationTestHarness with Logging { this.consumers(0).subscribe(tp) assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp)) - this.consumers(0).commit(CommitType.SYNC) + this.consumers(0).commit().get() assertEquals(0L, this.consumers(0).committed(tp)) consumeRecords(this.consumers(0), 5, 0) assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp)) - this.consumers(0).commit(CommitType.SYNC) + this.consumers(0).commit().get() assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp)) sendRecords(1) -- 2.3.3