Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12857

Using Connect Sink with CooperativeStickyAssignor results in commit offsets failure

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.7.1
    • None
    • connect
    • None
    • Linux

    Description

      We are attempting to use a Kafka Connect Sink Connector with CooperativeStickyAssignor assignment strategy.  When we use CooperativeStickyAssignor offset commits sometimes fail with 

      [2021-05-26 22:03:36,435] WARN WorkerSinkTask{id=sink-connector-7} Ignoring invalid task provided offset mytopic-0/OffsetAndMetadata{offset=16305575, leaderEpoch=null, metadata=''} – partition not assigned, assignment=[mytopic-0] (org.apache.kafka.connect.runtime.WorkerSinkTask:434)

      Note that the invalid partition in the warning message matches the partition assignment.

      Config changes

      consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

      Cooperative vs Eager Assignment Strategy background
      https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-ConsumerRebalanceListenerandConsumerPartitionAssignorSemantics

      With eager assignment:

      Listener#onPartitionsAssigned: called on the full set of assigned partitions (may have overlap with the partitions passed to #onPartitionsRevoked

      With cooperative assignment:

      Listener#onPartitionsAssigned: called on the subset of assigned partitions that were not previously owned before this rebalance. There should be no overlap with the revoked partitions (if any). This will always be called, even if there are no new partitions being assigned to a given member.

      This means with cooperative assignment, `onPartitionsAssigned` may be called with a partial assignment or an empty collection.

      However, the WorkerSinkTask.HandleRebalance class makes the assumption that `onPartitionsAssigned` is called with the full set of assigned partitions which is true for eager but not coooperative.

      WorkerSinkTask.HandleRebalance.java
              public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                  log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions);
                  lastCommittedOffsets = new HashMap<>();
                  currentOffsets = new HashMap<>();
                  for (TopicPartition tp : partitions) {
                      long pos = consumer.position(tp);
                      lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
                      currentOffsets.put(tp, new OffsetAndMetadata(pos));
                      log.debug("{} Assigned topic partition {} with offset {}", WorkerSinkTask.this, tp, pos);
                  }
      

      The onPartitionsAssigned creates a new empty HashMap and puts the offsets of the partitions in that HashMap.

      In the logs we see
      [2021-05-26 22:02:09,785] DEBUG WorkerSinkTask{id=sink-connector-7} Partitions assigned [myTopic-0] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)
      [2021-05-26 22:02:13,063] DEBUG WorkerSinkTask{id=sink-connector-7} Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)
      [2021-05-26 22:02:16,074] DEBUG WorkerSinkTask{id=sink-connector-7}  Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}

      These logs show that the CooperativeStickyAssignor calls onPartitionsAssigned first with the partition assigned to it followed by additional calls with an empty partitions collection.

      When HandleRebalance.onPartitionsAssigned is called first with the assigned partition followed by empty collections, the result will be lastCommittedOffsets initialized to an empty HashMap.

      Inside WorkerSinkTask.commitOffsets, the current committableOffsets are based on the lastCommittedOffsets, which is an empty HashMap:

      WorkerSinkTask.java
      private void commitOffsets(long now, boolean closing) {
      ...
              final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new HashMap<>(lastCommittedOffsets);
              for (Map.Entry<TopicPartition, OffsetAndMetadata> taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
                  final TopicPartition partition = taskProvidedOffsetEntry.getKey();
                  final OffsetAndMetadata taskProvidedOffset = taskProvidedOffsetEntry.getValue();
                  if (commitableOffsets.containsKey(partition)) {
                      long taskOffset = taskProvidedOffset.offset();
                      long currentOffset = currentOffsets.get(partition).offset();
                      if (taskOffset <= currentOffset) {
                          commitableOffsets.put(partition, taskProvidedOffset);
                      } else {
                          log.warn("{} Ignoring invalid task provided offset {}/{} -- not yet consumed, taskOffset={} currentOffset={}",
                                  this, partition, taskProvidedOffset, taskOffset, currentOffset);
                      }
                  } else {
                      log.warn("{} Ignoring invalid task provided offset {}/{} -- partition not assigned, assignment={}",
                              this, partition, taskProvidedOffset, consumer.assignment());
                  }
              }
      

      if (commitableOffsets.containsKey(partition)) is false because commitableOffsets} is an empty HashMap based on the empty lastCommittedOffsets HashMap . This causes {} Ignoring invalid task provided offset {}/{} – partition not assigned, assignment= to occur even though the task is assigned the partition.
      e.g.
      [2021-05-26 22:03:36,435] WARN WorkerSinkTask{id=sink-connector-7} Ignoring invalid task provided offset mytopic-0/OffsetAndMetadata{offset=16305575, leaderEpoch=null, metadata=''} – partition not assigned, assignment=[mytopic-0] (org.apache.kafka.connect.runtime.WorkerSinkTask:434)

      Recommended Changes
      WorkerSinkTask.HandleRebalance.onPartitionsAssigned needs to handle the new cooperative assignment strategy which may call onPartitionsAssigned with a subset of assigned partitions or an empty collection if no new partitions being assigned to a given member when initializing lastCommitedOffsets.

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              oliverhsu77 Oliver Hsu
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: