Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8816

RecordCollector offsets updated indirectly by StreamTask

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.0.2, 2.1.2, 2.2.2, 2.4.0, 2.3.1
    • Component/s: streams
    • Labels:
      None

      Description

      Currently it is possible to indirectly update the offsets in RecordCollectorImpl via the offset read function:

      @Override
      public Map<TopicPartition, Long> offsets() {
          return offsets;
      } 

      The offsets here is the a private final field in RecordCollectorImpl. It appears that the intent is for this field to be updated only when the producer acknowledges an offset. However, because it is handed back in a mutable form, it is possible to update offsets through this call, as actually happens today in StreamTask:

      protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
          final Map<TopicPartition, Long> checkpointableOffsets = recordCollector.offsets();
          for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
              checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
          }
      
          return checkpointableOffsets;
      }

      Here it is possible to set a new checkpoint if the topic partition is not already in the offsets map, which happens for the input topic when we're using optimized topologies and a KTable. The effect is that we continue to checkpoint the first offset seen (putIfAbsent).

      It seems the correct behavior would be to return a read only view of the offsets from RecordCollectorImpl and create a copy of the returned map in activeTaskCheckpointableOffsets before we mutate it.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                cpettitt-confluent Chris Pettitt
                Reporter:
                cpettitt-confluent Chris Pettitt
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: