My scenario implies handling business-critical data through Kafka, before writing it to the sink system. To ensure maximum possible delivery guarantee and control, I choose to use Kafka component with allowManualCommit option, as per documentation. This is required to be absolutely sure that message is delivered to the sink system, before committing the state and move to the next item or batch of items.
When I use allowManualCommit option but do not (intentionally) include any steps to actually execute the commit, the Consumer Group offset is still being committed, when I restart integrations.
The commit is executed due to triggering of onPartitionsRevoked event handler in KafkaConsumer.java, L473. This happens due to graceful termination of the running container.
- Line 485, initiates commitOffset() method with forceCommit parameter set to true;
- Line 453 host a condition that executes offset persistence to custom offsetRepository when allowManualCommit is disabled, and offsetRepository is specified - this condition is skipped, as in my scenario allowManualCommit is enabled
- Line 458 host an alternative condition to execute a commit when forceCommit parameter is true, and it is indeed true, as the method is invoked by onPartitionsRevoked.
The hidden side-effect of the scenario is that offset is committed, despite being ordered. This might potentially lead to the loss of messages, as there is no guarantee that messages were processed successfully.
- Execute forceCommit section only when allowManualCommit is disabled. In case of a manual commit, the developer should be in charge in all cases.
It must be also ensured that when custom offsetRepository is used, state is never persisted to both custom repository and standard consumer group. Currently that what happens when offsetRepository is paired with enabled allowManualCommit, through the above described scenario as well.