Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34096

Upscale of kafka source operator leads to some splits getting lost

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Duplicate
    • 1.18.0
    • None
    • None
    • None

    Description

      Hello,

      We've been conducting experiments with Autoscaling in Apache Flink version 1.18.0 and encountered a bug associated with the Kafka source split.

      The issue manifested in our system as follows: upon experiencing a sudden spike in traffic, the autoscaler opted to upscale the Kafka source vertex. However, the Kafka source fetcher failed to retrieve all available Kafka partitions. Additionally, we observed duplication in source splits. For example, taskmanager-1 and taskmanager-4 both fetched the same Kafka partition.

      taskmanager-9 2024-01-09 17:59:37 [Source: kafka_source_input_with_kt -> Flat Map -> session_valid (5/18)#6] INFO  o.a.f.c.b.s.r.SourceReaderBase - Adding split(s) to reader: [[Partition: sf-enriched-4, StartingOffset: 26084169, StoppingOffset: -9223372036854775808], [Partition: sf-anonymized-8, StartingOffset: 46477069, StoppingOffset: -9223372036854775808], [Partition: sf-anonymized-9, StartingOffset: 46121324, StoppingOffset: -9223372036854775808], [Partition: sf-enriched-5, StartingOffset: 26221751, StoppingOffset: -9223372036854775808]] 
      
      taskmanager-6 2024-01-09 17:59:37 [Source: kafka_source_input_with_kt -> Flat Map -> session_valid (4/18)#6] INFO  o.a.f.c.b.s.r.SourceReaderBase - Adding split(s) to reader: [[Partition: sf-enriched-4, StartingOffset: 26084169, StoppingOffset: -9223372036854775808], [Partition: sf-anonymized-8, StartingOffset: 46477069, StoppingOffset: -9223372036854775808], [Partition: sf-anonymized-20, StartingOffset: 46211745, StoppingOffset: -9223372036854775808], [Partition: sf-anonymized-32, StartingOffset: 46340878, StoppingOffset: -9223372036854775808]] 

      You can see in these logs, taskmanager-9 and taskmanager-6 has both fetched partition sf-enriched-4 and sf-anonymized-8

      Additional Questions

      • During some other experiments which also lead to kafka partition issues, we noticed that the autoscaler attempted to increase the parallelism of the source vertex to a value that is not a divisor of the Kafka topic's partition count. For example, it recommended a parallelism of 48 when the total partition count was 72. In such scenarios: 
        • Does kafka source connector vertex still suppose to works well when its parallelism is not divisor of topic's partition count?
        • If this configuration is not ideal, should there be a mechanism within the autoscaler to ensure that the parallelism of the source vertex always matches the topic's partition count?

       

       

       

       

       

       

      Attachments

        1. image-2024-01-15-15-46-47-104.png
          40 kB
          Yang LI
        2. image-2024-01-15-15-47-36-509.png
          556 kB
          Yang LI
        3. image-2024-01-15-15-48-07-871.png
          538 kB
          Yang LI
        4. source-split-log.txt
          9 kB
          Yang LI
        5. substate-log.txt
          39 kB
          Yang LI
        6. global-configuration-log.txt
          58 kB
          Yang LI

        Issue Links

          Activity

            People

              Unassigned Unassigned
              yang Yang LI
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: