Details
Description
Having discussed this with rhauch, it has been my assumption that SourceTask#stop() will be called by the Kafka Connect framework in case an exception has been raised in poll(). That's not the case, though. As an example see the connector and task below.
Calling stop() after an exception in poll() seems like a very useful action to take, as it'll allow the task to clean up any resources such as releasing any database connections, right after that failure and not only once the connector is stopped.
package com.example; import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; public class TestConnector extends SourceConnector { @Override public String version() { return null; } @Override public void start(Map<String, String> props) { } @Override public Class<? extends Task> taskClass() { return TestTask.class; } @Override public List<Map<String, String>> taskConfigs(int maxTasks) { return Collections.singletonList(Collections.singletonMap("foo", "bar")); } @Override public void stop() { } @Override public ConfigDef config() { return new ConfigDef(); } public static class TestTask extends SourceTask { @Override public String version() { return null; } @Override public void start(Map<String, String> props) { } @Override public List<SourceRecord> poll() throws InterruptedException { throw new RuntimeException(); } @Override public void stop() { System.out.println("stop() called"); } } }
Attachments
Issue Links
- is related to
-
KAFKA-10792 Source tasks can block herder thread by hanging during stop
-
- Resolved
-
- links to