Details
-
Task
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.17.0
-
None
-
Novice
Description
In the startPolling() method, if the consumer is suspending, then the unsubscribe method is called which permanently closes the consumer
Duration pollDuration = Duration.ofMillis(pollTimeoutMs); while (isKafkaConsumerRunnable() && isConnected() && pollExceptionStrategy.canContinue()) { ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration); if (consumerListener != null) { if (!consumerListener.afterConsume(consumer)) { continue; } } ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords); if (result.isBreakOnErrorHit()) { LOG.debug("We hit an error ... setting flags to force reconnect"); // force re-connect setReconnect(true); setConnected(false); } updateTaskState(); } if (!isConnected()) { LOG.debug("Not reconnecting, check whether to auto-commit or not ..."); commitManager.commit(); } safeUnsubscribe(); } catch (InterruptException e) {
Attachments
Issue Links
- is a clone of
-
CAMEL-17947 camel-kafka: fix concurrent access in camel-health
- Resolved
- links to