Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16700

Kafka Streams: possible message loss on KTable-KTable FK Left Join

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.7.0
    • None
    • streams
    • Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and 3 controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka Operators

    Description

      We are experiencing significant, yet intermittent / non-deterministic / unexplainable message loss on a Kafka Streams topology while performing a KTable-KTable FK Left Join.

      Assume the following snippet:

      streamsBuilder
          .table(
              folderTopicName,
              Consumed.with(
                  folderKeySerde,
                  folderSerde))
              .leftJoin(
                  agencies, // KTable<AgencyId, AggregateAgency>
                  Folder::agencyIdValue,
                  AggregateFolder::new,
                  TableJoined.as("folder-to-agency"),
                  Materialized
                      .as("folder-to-agency-materialized")
                      .withKeySerde(folderKeySerde)
                      .withValueSerde(aggregateFolderSerde))
              .leftJoin(
                  documents,
      

      The setup is as follows:

      A Debezium Connector for PostgreSQL streams database changes into various Kafka topics. A series of Quarkus Kafka Streams applications then performs aggregation operations on those topics to create index documents later to be sent into an OpenSearch system.

      When firing up the Kafka Streams infrastructure to work on initially populated Kafka Topics (i.e. a snapshot of all relevant table data has been streamed to Kafka), the above shown KTable-KTable FK Left Join seems to produce message loss on the first of a series of FK Left Joins; the right hand KTable<AgencyId, AggregateAgency> is consumed from an aggregated topic fed from another Kafka Streams topology / application.

      On a (heavily reduced) test data set of 6828 messages in the folderTopicName Topic, we observe the following results:

      • folder-to-agency-subscription-registration: 6967 messages
      • folder-to-agency-subscription-response: 3048 messages
      • folder-to-agency-subscription-store-changelog: 6359 messages
      • folder-to-agency-materialized-changelog: 4644 messages.

      Telling from the nature of a (FK) Left Join, I'd expect all messages from the left hand topic should produce an aggregate even if no matching message is found in the right hand topic.

      Message loss unpredictably varies across tests and seems not to be bound to specific keys or messages.

      As it seems, this can only be observed when initially firing up the Streams infrastructure to process the message 'backlog' that had been snapshotted by Debezium. A manual snapshot triggered later (i.e. Streams applications already running) seems not to show this behaviour. Additionally, as of yet we observed this kind of message loss only when running multiple replicas of the affected application. When carrying out the tests with only one replica, everything seems to work as expected. We've tried to leverage group.initial.rebalance.delay.ms in order to rule out possible rebalancing issues, but to no avail.

      Our Kafka configuration:

          offsets.topic.replication.factor: 3
          transaction.state.log.replication.factor: 3
          transaction.state.log.min.isr: 2
          default.replication.factor: 3
          min.insync.replicas: 2
          message.max.bytes: "20971520"
      

      Our Kafka Streams application configuration:

          kafka-streams.num.stream.threads: 5
          kafka-streams.num.standby.replicas: 1
          kafka-streams.auto.offset.reset: earliest
          kafka-streams.cache.max.bytes.buffering: "20971520"
          kafka-streams.commit.interval.ms: 100
          kafka-streams.fetch.max.bytes: "10485760"
          kafka-streams.max.request.size: "10485760"
          kafka-streams.max.partition.fetch.bytes: "10485760"
          kafka-streams.metadata.max.age.ms: 300000
          kafka-streams.statestore.cache.max.bytes: "20971520"
          kafka-streams.topology.optimization: all
          kafka-streams.processing.guarantee: exactly_once_v2
      
          # Kafka Streams Intermediate Topics
          kafka-streams.topic.compression.type: lz4
          kafka-streams.topic.segment.ms: "43200000" # 12h
          kafka-streams.topic.max.compaction.lag.ms: "86400000" # 24h
          kafka-streams.topic.delete.retention.ms: "86400000" # 24h
      
          kafka-streams.producer.max.request.size: "20971520" # 20MiB
          kafka-streams.producer.transaction.timeout.ms: 100 # Should match commit.interval.ms, set close to 100ms for exactly_once_v2
      
          kafka-streams.consumer.group.instance.id: ${HOSTNAME}
          kafka-streams.consumer.heartbeat.interval.ms: 100
          kafka-streams.consumer.session.timeout.ms: 45000
      

      All input (and aggregate) topics feature 15 partitions and share this configuration:

      cleanup.policy: compact          
      compression.type: lz4
      segment.ms: "43200000" # 12h
      max.compaction.lag.ms: "86400000" # 24h
      delete.retention.ms: "86400000" # 24h
      

      Logs show no indication of where or why this happens.

      The issue was discussed on the Kafka mailing list as well as on StackOverflow, but both threads lead to no further explanation. In the end, I was suggested to file a bug on the Kafka JIRA. I actually can't rule out that this is entirely based upon some setting in our Kafka environment, but there are other indications of similar message loss on FK Join operations. For the time being, I'd consider this a bug, perhaps emerging only under certain conditions.

      ATM I've no test case to reproduce the issue locally.

      In case any additional information is needed, I'd be happy to provide those.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            stoeckmk Karsten Stöckmann

            Dates

              Created:
              Updated:

              Slack

                Issue deployment