Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Done
-
3.7.0
Description
The two ConsumerDelegate implementations (LegacyKafkaConsumer and AsyncKafkaConsumer) have a fundamental difference related to their use and interpretation of the Timer that is supplied.
tl;dr
AsyncKafkaConsumer is very literal about the timeout, whereas LegacyKafkaConsumer seems to give a little wiggle room.
LegacyKafkaConsumer is structured so that the logic it uses can check for success of its operations before checking the timer:
- Submit operation asynchronously
- Wait for operation to complete using NetworkClient.poll()
- Check for result
- If successful, return success
- If fatal failure, return failure
- Check timer
- If timer expired, return failure
AsyncKafkaConsumer uses Future.get() to wait for its operations:
- Submit operation asynchronously
- Wait for operation to complete using Future.get()
- If operation timed out, Future.get() will throw a timeout error
- Check for result
- If successful, return success
- Otherwise, return failure
How to reproduce
This causes subtle timing issues, but they can be easily reproduced via any of the KafkaConsumerTest unit tests that invoke the consumer.poll(0) API. Here's a bit of code that illustrates the difference between the two approaches.
LegacyKafkaConsumer performs a lot of its network I/O operations in a manner similar to this:
public int getCount(Timer timer) { do { final RequestFuture<Integer> future = sendSomeRequest(partitions); client.poll(future, timer); if (future.isDone()) return future.get(); } while (timer.notExpired()); return -1; }
AsyncKafkaConsumer has similar logic, but it is structured like this:
private int getCount(Timer timer) { try { CompletableFuture<Integer> future = new CompleteableFuture<>(); applicationEventQueue.add(future); return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { return -1; } }
The call to add enqueues the network operation, but it then immediately invokes Future.get() with the timeout to implement a time-bounded blocking call. Since this method is being called with a timeout of 0, it immediately throws a TimeoutException.
Suggested fix
This task is to design and document the timeout policy for the new Consumer implementation.
The documentation lives here: https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts
Attachments
Issue Links
- is duplicated by
-
KAFKA-16208 Design new Consumer timeout policy
- Resolved
- relates to
-
KAFKA-15974 Enforce that event processing respects user-provided timeout
- Resolved
-
KAFKA-16200 Enforce that RequestManager implementations respect user-provided timeout
- Resolved
- mentioned in
-
Page Loading...