Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-17635

Lost events on internal repartition topic when excatly_once_v2 is set and producer is fenced

    XMLWordPrintableJSON

Details

    Description

      In some of the Kafka streams applications we observed that some events are missed during processing, when the processing guarantee was set to exactly_once_v2.
       
      It happened in different kafka stream applications at different places. The common pattern is that there was always an internal repartition topic involved (e.g. FK joins and aggregations on new key)

      With the following simplified example we could reproduce the problem:

      inputStream
        .groupBy((k, v) -> v, Grouped.with(String(), String()).withName("group"))
        .count(Materialized.as("count").withKeySerde(String()).withValueSerde(Long()));
      

      The analysis showed the following:

      • the event exists in the input topic
      • after repartition the changelog topic does not have always all events aggregated.

      It happens only occasional on production environment while processing millions of events on the initial load.

      We were able to seldom reproduce the problem in local environment in debugging mode.

      Our assumption is that there is a problem with the purging of events for the repartition topic.
      The StreamTask holds a list of consumedOffsets (used for purging internal repartition topics).
      After we got a TaskMigratedException (e.g. transaction timeout or similar), the stream task will be migrated and closed dirty.
      When the task is restored, then the consumedOffset list is not cleared.
      The consumedOffset list may contain offsets from aborted transactions.
      On the next purge cycle some not yet committed offset might get deleted from the repartition topic.

      2024-09-27T11:35:10.021+02:00  WARN 38644 --- [sandbox] [-StreamThread-4] o.a.k.s.p.internals.StreamThread         : stream-thread [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] Detected that the thread is being fenced. This implies that this thread missed a rebalance and dropped out of the consumer group. Will close out all assigned tasks and rejoin the consumer group.
      
      org.apache.kafka.streams.errors.TaskMigratedException: Producer got fenced trying to commit a transaction [stream-thread [main]]; it means all tasks belonging to this thread should be migrated.
      	at org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:304) ~[kafka-streams-3.7.1.jar:na]
      	at org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:203) ~[kafka-streams-3.7.1.jar:na]
      	at org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:154) ~[kafka-streams-3.7.1.jar:na]
      	at org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1875) ~[kafka-streams-3.7.1.jar:na]
      	at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1842) ~[kafka-streams-3.7.1.jar:na]
      	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1337) ~[kafka-streams-3.7.1.jar:na]
      	at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:986) ~[kafka-streams-3.7.1.jar:na]
      	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) ~[kafka-streams-3.7.1.jar:na]
      	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) ~[kafka-streams-3.7.1.jar:na]
      Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Transaction offset Commit failed due to consumer group metadata mismatch: The coordinator is not aware of this member.
      	at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1689) ~[kafka-clients-3.7.1.jar:na]
      	at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1236) ~[kafka-clients-3.7.1.jar:na]
      	at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) ~[kafka-clients-3.7.1.jar:na]
      	at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608) ~[kafka-clients-3.7.1.jar:na]
      	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600) ~[kafka-clients-3.7.1.jar:na]
      	at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:463) ~[kafka-clients-3.7.1.jar:na]
      	at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:339) ~[kafka-clients-3.7.1.jar:na]
      	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:253) ~[kafka-clients-3.7.1.jar:na]
      	at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
      
      2024-09-27T11:35:10.021+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] o.a.k.s.processor.internals.StreamTask   : stream-thread [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task [1_3] Suspended from RUNNING
      2024-09-27T11:35:11.420+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] o.a.k.s.processor.internals.StreamTask   : stream-thread [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task [1_3] Closed dirty
      2024-09-27T11:37:06.782+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] o.a.k.s.p.i.ProcessorStateManager        : stream-thread [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] stream-task [1_3] State store count did not find checkpoint offset, hence would default to the starting offset at changelog processTest-1-count-changelog-3
      2024-09-27T11:37:06.783+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] o.a.k.s.processor.internals.StreamTask   : stream-thread [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task [1_3] Initialized
      2024-09-27T11:37:06.787+02:00  INFO 38644 --- [sandbox] [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store count in regular mode
      2024-09-27T11:37:06.843+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] o.a.k.s.p.i.StoreChangelogReader         : stream-thread [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] End offset for changelog processTest-1-count-changelog-3 initialized as 916.
      2024-09-27T11:37:06.843+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer, groupId=null] Assigned to partition(s): processTest-1-count-changelog-3, processTest-1-count-changelog-1
      2024-09-27T11:37:06.843+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer, groupId=null] Seeking to earliest offset of partition processTest-1-count-changelog-1
      2024-09-27T11:37:06.844+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer, groupId=null] Resetting offset for partition processTest-1-count-changelog-3 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9093 (id: 2 rack: null)], epoch=0}}.
      2024-09-27T11:37:06.850+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] o.a.k.s.p.i.StoreChangelogReader         : stream-thread [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] Finished restoring changelog processTest-1-count-changelog-3 to store count with a total number of 456 records
      2024-09-27T11:37:06.851+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] o.a.k.s.processor.internals.StreamTask   : stream-thread [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task [1_3] Restored and ready to run
      2024-09-27T11:37:06.854+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] o.a.k.s.p.internals.StreamThread         : stream-thread [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] Restoration took 334 ms for all active tasks [0_3, 1_3, 0_1, 1_1]
      2024-09-27T11:37:06.854+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] o.a.k.s.p.internals.StreamThread         : stream-thread [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] State transition from PARTITIONS_ASSIGNED to RUNNING
      2024-09-27T11:37:06.854+02:00  INFO 38644 --- [sandbox] [-StreamThread-4] org.apache.kafka.streams.KafkaStreams    : stream-client [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7] State transition from REBALANCING to RUNNING
      

      In our test we produced the same amount of events to each partition (4)

      In the sample test we just count the events, therefore all 4 partition ahould have the same count eventually.

      Our current workaround would be to temporary increase the transaction.timeout.ms to a very high value.
      this should reduce the probability to have the tasks migrated.
      However, this is not really a solution.
      Another option would be to increase the repartition.purge.interval.ms to a very high value in order to disable the purging of repartition topics during initial load.

      Attachments

        1. screenshot-1.png
          107 kB
          Herbert Wespi

        Activity

          People

            bbejeck Bill Bejeck
            herbert.wespi Herbert Wespi
            Votes:
            2 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: