diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java index 9caca05..91a606f 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java @@ -53,24 +53,26 @@ public abstract class SourceTask implements Task { /** *

- * Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This - * method should block until the commit is complete. + * Notification that offsets, derived from all records that have been returned by {@link #poll()} prior to a + * given {@link SourceRecord} (included), has been written and flushed to the offset-storage. No further offsets + * will be written before after the return of from the call to this method. *

*

* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets - * automatically. This hook is provided for systems that also need to store offsets internally - * in their own system. + * automatically. This hook is provided for systems that need react to such an event. *

+ * + * @param record Last polled {@link SourceRecord} that had its offsets successfully written and flushed. */ - public void commit() throws InterruptedException { + public void offsetsCommitted(SourceRecord record) throws InterruptedException { // This space intentionally left blank. } /** * Signal this SourceTask to stop. In SourceTasks, this method only needs to signal to the task that it should stop * trying to poll for new data and interrupt any outstanding poll() requests. It is not required that the task has - * fully stopped. Note that this method necessarily may be invoked from a different thread than {@link #poll()} and - * {@link #commit()}. + * fully stopped. Note that this method necessarily may be invoked from a different thread than {@link #poll()}, + * {@link #recordCommitted(SourceRecord)} and {@link #offsetsCommitted(SourceRecord)}. * * For example, if a task uses a {@link java.nio.channels.Selector} to receive data over the network, this method * could set a flag that will force {@link #poll()} to exit immediately and invoke @@ -80,18 +82,19 @@ public abstract class SourceTask implements Task { /** *

- * Commit an individual {@link SourceRecord} when the callback from the producer client is received, or if a record is filtered by a transformation. + * Notification that the outgoing Kafka-record derived from a given {@link SourceRecord} has been successfully + * sent to and acknowledged by Kafka, or that this given {@link SourceRecord} has been filtered by a transformation. *

*

* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets - * automatically. This hook is provided for systems that also need to store offsets internally - * in their own system. + * automatically. This hook is provided for systems that need to react to such an event. *

* - * @param record {@link SourceRecord} that was successfully sent via the producer. + * @param record The {@link SourceRecord} that was successfully sent to and acknowledged by Kafka. May be null, indicating + * that there has been a round of commit where there was nothing to commit * @throws InterruptedException */ - public void commitRecord(SourceRecord record) throws InterruptedException { + public void recordCommitted(SourceRecord record) throws InterruptedException { // This space intentionally left blank. } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index adf1582..d819a9e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -59,6 +59,7 @@ class WorkerSourceTask extends WorkerTask { private KafkaProducer producer; private final OffsetStorageReader offsetReader; private final OffsetStorageWriter offsetWriter; + private SourceRecord offsetLastWrittenFor; private final Time time; private List toSend; @@ -189,7 +190,7 @@ class WorkerSourceTask extends WorkerTask { final SourceRecord record = transformationChain.apply(preTransformRecord); if (record == null) { - commitTaskRecord(preTransformRecord); + notifyRecordCommitted(preTransformRecord); continue; } @@ -210,6 +211,7 @@ class WorkerSourceTask extends WorkerTask { } // Offsets are converted & serialized in the OffsetWriter offsetWriter.offset(record.sourcePartition(), record.sourceOffset()); + offsetLastWrittenFor = record; } } try { @@ -231,7 +233,7 @@ class WorkerSourceTask extends WorkerTask { log.trace("Wrote record successfully: topic {} partition {} offset {}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); - commitTaskRecord(preTransformRecord); + notifyRecordCommitted(preTransformRecord); } recordSent(producerRecord); } @@ -251,9 +253,9 @@ class WorkerSourceTask extends WorkerTask { return true; } - private void commitTaskRecord(SourceRecord record) { + private void notifyRecordCommitted(SourceRecord record) { try { - task.commitRecord(record); + task.recordCommitted(record); } catch (InterruptedException e) { log.error("Exception thrown", e); } catch (Throwable t) { @@ -284,6 +286,8 @@ class WorkerSourceTask extends WorkerTask { long started = time.milliseconds(); long timeout = started + commitTimeoutMs; + SourceRecord offsetLastWrittenFor; + synchronized (this) { // First we need to make sure we snapshot everything in exactly the current state. This // means both the current set of messages we're still waiting to finish, stored in this @@ -292,6 +296,7 @@ class WorkerSourceTask extends WorkerTask { // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot. flushing = true; boolean flushStarted = offsetWriter.beginFlush(); + offsetLastWrittenFor = this.offsetLastWrittenFor; // Still wait for any producer records to flush, even if there aren't any offsets to write // to persistent storage @@ -325,7 +330,7 @@ class WorkerSourceTask extends WorkerTask { log.debug("Finished {} offset commitOffsets successfully in {} ms", this, time.milliseconds() - started); - commitSourceTask(); + notifyOffsetsCommitted(null); return true; } } @@ -371,14 +376,14 @@ class WorkerSourceTask extends WorkerTask { log.info("Finished {} commitOffsets successfully in {} ms", this, time.milliseconds() - started); - commitSourceTask(); + notifyOffsetsCommitted(offsetLastWrittenFor); return true; } - private void commitSourceTask() { + private void notifyOffsetsCommitted(SourceRecord record) { try { - this.task.commit(); + this.task.offsetsCommitted(record); } catch (InterruptedException ex) { log.warn("Commit interrupted", ex); } catch (Throwable t) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java index 0436265..ec1404e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java @@ -122,7 +122,7 @@ public class VerifiableSourceTask extends SourceTask { } @Override - public void commitRecord(SourceRecord record) throws InterruptedException { + public void recordCommitted(SourceRecord record) throws InterruptedException { Map data = new HashMap<>(); data.put("name", name); data.put("task", id); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 8c07ba1..96b6163 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -640,7 +640,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { expect.andAnswer(expectResponse); // 3. As a result of a successful producer send callback, we'll notify the source task of the record commit - expectTaskCommitRecord(anyTimes, succeed); + expectRecordCommitted(anyTimes, succeed); return sent; } @@ -677,8 +677,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { }); } - private void expectTaskCommitRecord(boolean anyTimes, boolean succeed) throws InterruptedException { - sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class)); + private void expectRecordCommitted(boolean anyTimes, boolean succeed) throws InterruptedException { + sourceTask.recordCommitted(EasyMock.anyObject(SourceRecord.class)); IExpectationSetters expect = EasyMock.expectLastCall(); if (!succeed) { expect = expect.andThrow(new RuntimeException("Error committing record in source task")); @@ -706,7 +706,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { IExpectationSetters futureGetExpect = EasyMock.expect( flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class))); if (succeed) { - sourceTask.commit(); + sourceTask.offsetsCommitted(EasyMock.anyObject(SourceRecord.class)); EasyMock.expectLastCall(); futureGetExpect.andReturn(null); } else {