Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-2886

WorkerSinkTask doesn't catch exceptions from rebalance callbacks

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.10.0.0
    • Component/s: KafkaConnect
    • Labels:
      None

      Description

      WorkerSinkTask exposes rebalance callbacks to tasks by invoking onPartitionsRevoked and onPartitionsAssigned on the task. However, these aren't guarded by try/catch blocks, so they can propagate the errors up to the consumer:

      [2015-11-24 15:52:24,071] ERROR User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on partition assignment: (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
      java.lang.UnsupportedOperationException
      at java.util.Collections$UnmodifiableCollection.clear(Collections.java:1094)
      at io.confluent.connect.hdfs.DataWriter.onPartitionsAssigned(DataWriter.java:207)
      at io.confluent.connect.hdfs.HdfsSinkTask.onPartitionsAssigned(HdfsSinkTask.java:103)
      at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:369)
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:189)
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:227)
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:306)
      at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:861)
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171)
      at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
      at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
      at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
      [2015-11-24 15:52:24,477] INFO Cannot acquire lease on WAL hdfs://worker4:9000/logs/test/0/log (io.confluent.connect.hdfs.wal.FSWAL)

      This actually currently works ok for onPartitionsAssigned because the callback is the last thing invoked. For onPartitionsRevoked, it causes offsets to not be committed and the current message batch being processed to not be cleared. Additionally, we may need to do something more to clean up, e.g. the task may need to stop processing data entirely since the task may now be in a bad state.

        Attachments

          Activity

            People

            • Assignee:
              hachikuji Jason Gustafson
              Reporter:
              ewencp Ewen Cheslack-Postava
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: