Description
The 'done' function in RecordBatch.java attempts to enumerate and call each onCompletion() callback. However the call to thunk.future.get() can throw an exception. When this occurs the callback is not invoked. This appears to be the only place where a callback per async send would not occur and the callback orphaned.
The call to thunk.future.get() appears to need to occur in its own try/catch and then the onCompletion called with the results if it doesn't throw an exception or thunk.callback.onCompletion(null, recordException) if it does.
e.g.
/**
- Complete the request
- @param baseOffset The base offset of the messages assigned by the server
- @param exception The exception that occurred (or null if the request was successful)
*/
public void done(long baseOffset, RuntimeException exception) {
this.produceFuture.done(topicPartition, baseOffset, exception);
log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
topicPartition,
baseOffset,
exception);
// execute callbacks
for (int i = 0; i < this.thunks.size(); i++) {
try {
Thunk thunk = this.thunks.get;
if (exception == null)Unknown macro: { RecordMetadata rc = null; try { rc = thunk.future.get(); } catch(Exception recordException) { thunk.callback.onCompletion(null, recordException); } if(rc != null) { thunk.callback.onCompletion(rc, null); } }else
{ thunk.callback.onCompletion(null, exception); }
} catch (Exception e) {
log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e);
}
}
}