From 6b25c887929056bac0e6716d05326ca2b293d71d Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 28 May 2015 14:51:08 -0700 Subject: [PATCH] KAFKA-2168; Remove synchronization of KafkaConsumer to enable non-blocking close --- .../clients/consumer/ClosedConsumerException.java | 25 +++++++ .../kafka/clients/consumer/KafkaConsumer.java | 83 +++++++++++++--------- .../clients/consumer/internals/Coordinator.java | 23 +++++- .../kafka/clients/consumer/internals/Fetcher.java | 23 +++++- 4 files changed, 118 insertions(+), 36 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/ClosedConsumerException.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ClosedConsumerException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ClosedConsumerException.java new file mode 100644 index 0000000..e8440df --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ClosedConsumerException.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.KafkaException; + +/** + * Thrown to indicate that a consumer has been closed and cannot process the given request + */ +public class ClosedConsumerException extends KafkaException { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index d301be4..b50e9ff 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -297,18 +298,22 @@ import org.slf4j.LoggerFactory; * {@link #seekToBeginning(TopicPartition...)} and {@link #seekToEnd(TopicPartition...)} respectively). * *

Multithreaded Processing

- * - * The Kafka consumer is threadsafe but coarsely synchronized. All network I/O happens in the thread of the application - * making the call. We have intentionally avoided implementing a particular threading model for processing. + * + * The Kafka consumer is NOT thread-safe, which means that you are responsible for ensuring safe multi-threaded + * access. In the absence of external locking, the consumer should be confined to a single thread. It is safe, however + * to invoke {@link #close()} from another thread (e.g. in a shutdown hook). When the consumer is closed from another + * thread, any active request (along with any future requests) will throw {@link ClosedConsumerException}. + * *

- * This leaves several options for implementing multi-threaded processing of records. + * We have intentionally avoided implementing a particular threading model for processing. This leaves several options + * for implementing multi-threaded processing of records. * *

1. One Consumer Per Thread

* - * A simple option is to give each thread it's own consumer instance. Here are the pros and cons of this approach: + * A simple option is to give each thread its own consumer instance. Here are the pros and cons of this approach: *