Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Duplicate
-
1.18.0
-
None
-
None
-
None
Description
Hello,
We've been conducting experiments with Autoscaling in Apache Flink version 1.18.0 and encountered a bug associated with the Kafka source split.
The issue manifested in our system as follows: upon experiencing a sudden spike in traffic, the autoscaler opted to upscale the Kafka source vertex. However, the Kafka source fetcher failed to retrieve all available Kafka partitions. Additionally, we observed duplication in source splits. For example, taskmanager-1 and taskmanager-4 both fetched the same Kafka partition.
taskmanager-9 2024-01-09 17:59:37 [Source: kafka_source_input_with_kt -> Flat Map -> session_valid (5/18)#6] INFO o.a.f.c.b.s.r.SourceReaderBase - Adding split(s) to reader: [[Partition: sf-enriched-4, StartingOffset: 26084169, StoppingOffset: -9223372036854775808], [Partition: sf-anonymized-8, StartingOffset: 46477069, StoppingOffset: -9223372036854775808], [Partition: sf-anonymized-9, StartingOffset: 46121324, StoppingOffset: -9223372036854775808], [Partition: sf-enriched-5, StartingOffset: 26221751, StoppingOffset: -9223372036854775808]] taskmanager-6 2024-01-09 17:59:37 [Source: kafka_source_input_with_kt -> Flat Map -> session_valid (4/18)#6] INFO o.a.f.c.b.s.r.SourceReaderBase - Adding split(s) to reader: [[Partition: sf-enriched-4, StartingOffset: 26084169, StoppingOffset: -9223372036854775808], [Partition: sf-anonymized-8, StartingOffset: 46477069, StoppingOffset: -9223372036854775808], [Partition: sf-anonymized-20, StartingOffset: 46211745, StoppingOffset: -9223372036854775808], [Partition: sf-anonymized-32, StartingOffset: 46340878, StoppingOffset: -9223372036854775808]]
You can see in these logs, taskmanager-9 and taskmanager-6 has both fetched partition sf-enriched-4 and sf-anonymized-8
Additional Questions
- During some other experiments which also lead to kafka partition issues, we noticed that the autoscaler attempted to increase the parallelism of the source vertex to a value that is not a divisor of the Kafka topic's partition count. For example, it recommended a parallelism of 48 when the total partition count was 72. In such scenarios:
-
- Does kafka source connector vertex still suppose to works well when its parallelism is not divisor of topic's partition count?
- If this configuration is not ideal, should there be a mechanism within the autoscaler to ensure that the parallelism of the source vertex always matches the topic's partition count?
Attachments
Attachments
Issue Links
- duplicates
-
FLINK-34063 When snapshot compression is enabled, rescaling of a source operator leads to some splits getting lost
- Resolved