Details
Description
We are experiencing significant, yet intermittent / non-deterministic / unexplainable message loss on a Kafka Streams topology while performing a KTable-KTable FK Left Join.
Assume the following snippet:
streamsBuilder .table( folderTopicName, Consumed.with( folderKeySerde, folderSerde)) .leftJoin( agencies, // KTable<AgencyId, AggregateAgency> Folder::agencyIdValue, AggregateFolder::new, TableJoined.as("folder-to-agency"), Materialized .as("folder-to-agency-materialized") .withKeySerde(folderKeySerde) .withValueSerde(aggregateFolderSerde)) .leftJoin( documents,
The setup is as follows:
A Debezium Connector for PostgreSQL streams database changes into various Kafka topics. A series of Quarkus Kafka Streams applications then performs aggregation operations on those topics to create index documents later to be sent into an OpenSearch system.
When firing up the Kafka Streams infrastructure to work on initially populated Kafka Topics (i.e. a snapshot of all relevant table data has been streamed to Kafka), the above shown KTable-KTable FK Left Join seems to produce message loss on the first of a series of FK Left Joins; the right hand KTable<AgencyId, AggregateAgency> is consumed from an aggregated topic fed from another Kafka Streams topology / application.
On a (heavily reduced) test data set of 6828 messages in the folderTopicName Topic, we observe the following results:
- folder-to-agency-subscription-registration: 6967 messages
- folder-to-agency-subscription-response: 3048 messages
- folder-to-agency-subscription-store-changelog: 6359 messages
- folder-to-agency-materialized-changelog: 4644 messages.
Telling from the nature of a (FK) Left Join, I'd expect all messages from the left hand topic should produce an aggregate even if no matching message is found in the right hand topic.
Message loss unpredictably varies across tests and seems not to be bound to specific keys or messages.
As it seems, this can only be observed when initially firing up the Streams infrastructure to process the message 'backlog' that had been snapshotted by Debezium. A manual snapshot triggered later (i.e. Streams applications already running) seems not to show this behaviour. Additionally, as of yet we observed this kind of message loss only when running multiple replicas of the affected application. When carrying out the tests with only one replica, everything seems to work as expected. We've tried to leverage group.initial.rebalance.delay.ms in order to rule out possible rebalancing issues, but to no avail.
Our Kafka configuration:
offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 default.replication.factor: 3 min.insync.replicas: 2 message.max.bytes: "20971520"
Our Kafka Streams application configuration:
kafka-streams.num.stream.threads: 5 kafka-streams.num.standby.replicas: 1 kafka-streams.auto.offset.reset: earliest kafka-streams.cache.max.bytes.buffering: "20971520" kafka-streams.commit.interval.ms: 100 kafka-streams.fetch.max.bytes: "10485760" kafka-streams.max.request.size: "10485760" kafka-streams.max.partition.fetch.bytes: "10485760" kafka-streams.metadata.max.age.ms: 300000 kafka-streams.statestore.cache.max.bytes: "20971520" kafka-streams.topology.optimization: all kafka-streams.processing.guarantee: exactly_once_v2 # Kafka Streams Intermediate Topics kafka-streams.topic.compression.type: lz4 kafka-streams.topic.segment.ms: "43200000" # 12h kafka-streams.topic.max.compaction.lag.ms: "86400000" # 24h kafka-streams.topic.delete.retention.ms: "86400000" # 24h kafka-streams.producer.max.request.size: "20971520" # 20MiB kafka-streams.producer.transaction.timeout.ms: 100 # Should match commit.interval.ms, set close to 100ms for exactly_once_v2 kafka-streams.consumer.group.instance.id: ${HOSTNAME} kafka-streams.consumer.heartbeat.interval.ms: 100 kafka-streams.consumer.session.timeout.ms: 45000
All input (and aggregate) topics feature 15 partitions and share this configuration:
cleanup.policy: compact compression.type: lz4 segment.ms: "43200000" # 12h max.compaction.lag.ms: "86400000" # 24h delete.retention.ms: "86400000" # 24h
Logs show no indication of where or why this happens.
The issue was discussed on the Kafka mailing list as well as on StackOverflow, but both threads lead to no further explanation. In the end, I was suggested to file a bug on the Kafka JIRA. I actually can't rule out that this is entirely based upon some setting in our Kafka environment, but there are other indications of similar message loss on FK Join operations. For the time being, I'd consider this a bug, perhaps emerging only under certain conditions.
ATM I've no test case to reproduce the issue locally.
In case any additional information is needed, I'd be happy to provide those.