diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
index 9caca05..91a606f 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
@@ -53,24 +53,26 @@ public abstract class SourceTask implements Task {
/**
*
- * Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This
- * method should block until the commit is complete.
+ * Notification that offsets, derived from all records that have been returned by {@link #poll()} prior to a
+ * given {@link SourceRecord} (included), has been written and flushed to the offset-storage. No further offsets
+ * will be written before after the return of from the call to this method.
*
*
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
- * automatically. This hook is provided for systems that also need to store offsets internally
- * in their own system.
+ * automatically. This hook is provided for systems that need react to such an event.
*
+ *
+ * @param record Last polled {@link SourceRecord} that had its offsets successfully written and flushed.
*/
- public void commit() throws InterruptedException {
+ public void offsetsCommitted(SourceRecord record) throws InterruptedException {
// This space intentionally left blank.
}
/**
* Signal this SourceTask to stop. In SourceTasks, this method only needs to signal to the task that it should stop
* trying to poll for new data and interrupt any outstanding poll() requests. It is not required that the task has
- * fully stopped. Note that this method necessarily may be invoked from a different thread than {@link #poll()} and
- * {@link #commit()}.
+ * fully stopped. Note that this method necessarily may be invoked from a different thread than {@link #poll()},
+ * {@link #recordCommitted(SourceRecord)} and {@link #offsetsCommitted(SourceRecord)}.
*
* For example, if a task uses a {@link java.nio.channels.Selector} to receive data over the network, this method
* could set a flag that will force {@link #poll()} to exit immediately and invoke
@@ -80,18 +82,19 @@ public abstract class SourceTask implements Task {
/**
*
- * Commit an individual {@link SourceRecord} when the callback from the producer client is received, or if a record is filtered by a transformation.
+ * Notification that the outgoing Kafka-record derived from a given {@link SourceRecord} has been successfully
+ * sent to and acknowledged by Kafka, or that this given {@link SourceRecord} has been filtered by a transformation.
*
*
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
- * automatically. This hook is provided for systems that also need to store offsets internally
- * in their own system.
+ * automatically. This hook is provided for systems that need to react to such an event.
*
*
- * @param record {@link SourceRecord} that was successfully sent via the producer.
+ * @param record The {@link SourceRecord} that was successfully sent to and acknowledged by Kafka. May be null, indicating
+ * that there has been a round of commit where there was nothing to commit
* @throws InterruptedException
*/
- public void commitRecord(SourceRecord record) throws InterruptedException {
+ public void recordCommitted(SourceRecord record) throws InterruptedException {
// This space intentionally left blank.
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index adf1582..d819a9e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -59,6 +59,7 @@ class WorkerSourceTask extends WorkerTask {
private KafkaProducer producer;
private final OffsetStorageReader offsetReader;
private final OffsetStorageWriter offsetWriter;
+ private SourceRecord offsetLastWrittenFor;
private final Time time;
private List toSend;
@@ -189,7 +190,7 @@ class WorkerSourceTask extends WorkerTask {
final SourceRecord record = transformationChain.apply(preTransformRecord);
if (record == null) {
- commitTaskRecord(preTransformRecord);
+ notifyRecordCommitted(preTransformRecord);
continue;
}
@@ -210,6 +211,7 @@ class WorkerSourceTask extends WorkerTask {
}
// Offsets are converted & serialized in the OffsetWriter
offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
+ offsetLastWrittenFor = record;
}
}
try {
@@ -231,7 +233,7 @@ class WorkerSourceTask extends WorkerTask {
log.trace("Wrote record successfully: topic {} partition {} offset {}",
recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset());
- commitTaskRecord(preTransformRecord);
+ notifyRecordCommitted(preTransformRecord);
}
recordSent(producerRecord);
}
@@ -251,9 +253,9 @@ class WorkerSourceTask extends WorkerTask {
return true;
}
- private void commitTaskRecord(SourceRecord record) {
+ private void notifyRecordCommitted(SourceRecord record) {
try {
- task.commitRecord(record);
+ task.recordCommitted(record);
} catch (InterruptedException e) {
log.error("Exception thrown", e);
} catch (Throwable t) {
@@ -284,6 +286,8 @@ class WorkerSourceTask extends WorkerTask {
long started = time.milliseconds();
long timeout = started + commitTimeoutMs;
+ SourceRecord offsetLastWrittenFor;
+
synchronized (this) {
// First we need to make sure we snapshot everything in exactly the current state. This
// means both the current set of messages we're still waiting to finish, stored in this
@@ -292,6 +296,7 @@ class WorkerSourceTask extends WorkerTask {
// OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
flushing = true;
boolean flushStarted = offsetWriter.beginFlush();
+ offsetLastWrittenFor = this.offsetLastWrittenFor;
// Still wait for any producer records to flush, even if there aren't any offsets to write
// to persistent storage
@@ -325,7 +330,7 @@ class WorkerSourceTask extends WorkerTask {
log.debug("Finished {} offset commitOffsets successfully in {} ms",
this, time.milliseconds() - started);
- commitSourceTask();
+ notifyOffsetsCommitted(null);
return true;
}
}
@@ -371,14 +376,14 @@ class WorkerSourceTask extends WorkerTask {
log.info("Finished {} commitOffsets successfully in {} ms",
this, time.milliseconds() - started);
- commitSourceTask();
+ notifyOffsetsCommitted(offsetLastWrittenFor);
return true;
}
- private void commitSourceTask() {
+ private void notifyOffsetsCommitted(SourceRecord record) {
try {
- this.task.commit();
+ this.task.offsetsCommitted(record);
} catch (InterruptedException ex) {
log.warn("Commit interrupted", ex);
} catch (Throwable t) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
index 0436265..ec1404e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
@@ -122,7 +122,7 @@ public class VerifiableSourceTask extends SourceTask {
}
@Override
- public void commitRecord(SourceRecord record) throws InterruptedException {
+ public void recordCommitted(SourceRecord record) throws InterruptedException {
Map data = new HashMap<>();
data.put("name", name);
data.put("task", id);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 8c07ba1..96b6163 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -640,7 +640,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
expect.andAnswer(expectResponse);
// 3. As a result of a successful producer send callback, we'll notify the source task of the record commit
- expectTaskCommitRecord(anyTimes, succeed);
+ expectRecordCommitted(anyTimes, succeed);
return sent;
}
@@ -677,8 +677,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
});
}
- private void expectTaskCommitRecord(boolean anyTimes, boolean succeed) throws InterruptedException {
- sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class));
+ private void expectRecordCommitted(boolean anyTimes, boolean succeed) throws InterruptedException {
+ sourceTask.recordCommitted(EasyMock.anyObject(SourceRecord.class));
IExpectationSetters expect = EasyMock.expectLastCall();
if (!succeed) {
expect = expect.andThrow(new RuntimeException("Error committing record in source task"));
@@ -706,7 +706,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
IExpectationSetters futureGetExpect = EasyMock.expect(
flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)));
if (succeed) {
- sourceTask.commit();
+ sourceTask.offsetsCommitted(EasyMock.anyObject(SourceRecord.class));
EasyMock.expectLastCall();
futureGetExpect.andReturn(null);
} else {