Details
-
Improvement
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
None
-
Unknown
Description
EventHubsConsumer and EventHubsCheckpointUpdaterTimerTask are calling a blocking method from Azure EventContext when updating a CheckPoint.
In EventHubsConsumer:
private void processCommit(final Exchange exchange, final EventContext eventContext) { ... try { var completionCondition = processCheckpoint(exchange); if (completionCondition.equals(COMPLETED_BY_SIZE)) { eventContext.updateCheckpoint(); <-- HERE processedEvents.set(0); ... }
In EventHubsCheckpointUpdaterTimerTaks
@Override public void run() { if (processedEvents.get() > 0) { ... eventContext.updateCheckpoint(); <-- HERE processedEvents.set(0); } ... }
And corresponding method in Azure EventContext:
public void updateCheckpoint() { this.updateCheckpointAsync().block(); }
I suppose EventContext's async update method should be invoked (and subscribed)
Attachments
Issue Links
- links to