Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6566

SourceTask#stop() not called after exception raised in poll()

    XMLWordPrintableJSON

    Details

      Description

      Having discussed this with Randall Hauch, it has been my assumption that SourceTask#stop() will be called by the Kafka Connect framework in case an exception has been raised in poll(). That's not the case, though. As an example see the connector and task below.

      Calling stop() after an exception in poll() seems like a very useful action to take, as it'll allow the task to clean up any resources such as releasing any database connections, right after that failure and not only once the connector is stopped.

      package com.example;
      
      import java.util.Collections;
      import java.util.List;
      import java.util.Map;
      
      import org.apache.kafka.common.config.ConfigDef;
      import org.apache.kafka.connect.connector.Task;
      import org.apache.kafka.connect.source.SourceConnector;
      import org.apache.kafka.connect.source.SourceRecord;
      import org.apache.kafka.connect.source.SourceTask;
      
      public class TestConnector extends SourceConnector {
      
          @Override
          public String version() {
              return null;
          }
      
          @Override
          public void start(Map<String, String> props) {
          }
      
          @Override
          public Class<? extends Task> taskClass() {
              return TestTask.class;
          }
      
          @Override
          public List<Map<String, String>> taskConfigs(int maxTasks) {
              return Collections.singletonList(Collections.singletonMap("foo", "bar"));
          }
      
          @Override
          public void stop() {
          }
      
          @Override
          public ConfigDef config() {
              return new ConfigDef();
          }
      
          public static class TestTask extends SourceTask {
      
              @Override
              public String version() {
                  return null;
              }
      
              @Override
              public void start(Map<String, String> props) {
              }
      
              @Override
              public List<SourceRecord> poll() throws InterruptedException {
                  throw new RuntimeException();
              }
      
              @Override
              public void stop() {
                  System.out.println("stop() called");
              }
          }
      }
      
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                mjsax Matthias J. Sax
                Reporter:
                gunnar.morling Gunnar Morling
                Reviewer:
                Ewen Cheslack-Postava
              • Votes:
                1 Vote for this issue
                Watchers:
                9 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: