Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6566

SourceTask#stop() not called after exception raised in poll()

    XMLWordPrintableJSON

    Details

      Description

      Having discussed this with Randall Hauch, it has been my assumption that SourceTask#stop() will be called by the Kafka Connect framework in case an exception has been raised in poll(). That's not the case, though. As an example see the connector and task below.

      Calling stop() after an exception in poll() seems like a very useful action to take, as it'll allow the task to clean up any resources such as releasing any database connections, right after that failure and not only once the connector is stopped.

      package com.example;
      
      import java.util.Collections;
      import java.util.List;
      import java.util.Map;
      
      import org.apache.kafka.common.config.ConfigDef;
      import org.apache.kafka.connect.connector.Task;
      import org.apache.kafka.connect.source.SourceConnector;
      import org.apache.kafka.connect.source.SourceRecord;
      import org.apache.kafka.connect.source.SourceTask;
      
      public class TestConnector extends SourceConnector {
      
          @Override
          public String version() {
              return null;
          }
      
          @Override
          public void start(Map<String, String> props) {
          }
      
          @Override
          public Class<? extends Task> taskClass() {
              return TestTask.class;
          }
      
          @Override
          public List<Map<String, String>> taskConfigs(int maxTasks) {
              return Collections.singletonList(Collections.singletonMap("foo", "bar"));
          }
      
          @Override
          public void stop() {
          }
      
          @Override
          public ConfigDef config() {
              return new ConfigDef();
          }
      
          public static class TestTask extends SourceTask {
      
              @Override
              public String version() {
                  return null;
              }
      
              @Override
              public void start(Map<String, String> props) {
              }
      
              @Override
              public List<SourceRecord> poll() throws InterruptedException {
                  throw new RuntimeException();
              }
      
              @Override
              public void stop() {
                  System.out.println("stop() called");
              }
          }
      }
      
      

        Attachments

          Activity

            People

            • Assignee:
              rayokota Robert Yokota
              Reporter:
              gunnar.morling Gunnar Morling
              Reviewer:
              Ewen Cheslack-Postava
            • Votes:
              1 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: