Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9891

Invalid state store content after task migration with exactly_once and standby replicas

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 2.3.1, 2.5.0, 2.4.1
    • Fix Version/s: 2.6.0, 2.5.1
    • Component/s: streams
    • Labels:
      None

      Description

      We have a simple command id deduplication mechanism (very similar to the one from Kafka Streams examples) based on Kafka Streams State Stores. It stores command ids from the past hour in persistentWindowStore. We encountered a problem with the store if there's an exception thrown later in that topology.
      We run 3 nodes using docker, each with multiple threads set for this particular Streams Application.

      The business flow is as follows (performed within a single subtopology):

      •  a valid command is sent with command id (mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f). NODE 1 is running an active task 1_2. First node in the topology analyses if this is a duplicate by checking in the state store (COMMAND_ID_STORE), if not puts the command id in the state store and processes the command properly.
      • an invalid command is sent with the same key but new command id (mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc). Again, check for the duplicated command id is performed, it's not a duplicate, command id is put into the state store. Next node in the topology throws an exception which causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, offsets are not committed. I double checked for the changelog topic - relevant messages are not committed. Therefore, the changelog topic contains only the first command id mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f, and not the one which caused a failure.
      • in the meantime a standby task 1_2 running on NODE 3 replicated mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f command id into a local COMMAND_ID_STORE
      • standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. It checks if this command id is a duplicate - no, it isn't - tries to process the faulty command and throws an exception. Again, transaction aborted, all looks fine.
      • NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, it is a duplicate! Even though the transaction has been aborted and the changelog doesn't contain this command id: mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc.

       

      After digging into the Streams logs and some discussion on (Stack Overflow) we concluded it has something to do with checkpoint files. Here are the detailed logs relevant to checkpoint files.

       

      NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.ProcessorStateManager : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task [1_2] Checkpointable offsets read from checkpoint: {}
      NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.ProcessorStateManager : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task [1_2] Restoring state store COMMAND_ID_STORE from changelog topic XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint null
      NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] standby-task [1_2] Checkpointable offsets read from checkpoint: {}
      NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
      NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
      NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
      NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp /tmp/kafka-streams/XXXXProcessor/1_2/.checkpointNODE_1 log1:2020-04-15 21:11:33.942 DEBUG 1 --- [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator : CommandId: mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.NODE_3 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
      NODE_3 2020-04-15 21:11:47.233 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
      NODE_3 2020-04-15 21:11:49.075 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.ProcessorStateManager : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task [1_2] Restoring state store COMMAND_ID_STORE from changelog topic XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
      NODE_3 2020-04-15 21:11:49.436 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.StoreChangelogReader : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Found checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for store COMMAND_ID_STORE.NODE_3 2020-04-15 21:11:52.023 DEBUG 1 --- [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator : CommandId: mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.
      NODE_3 2020-04-15 21:11:53.683 ERROR 1 --- [-StreamThread-2] o.a.k.s.p.i.AssignedStreamsTasks : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Failed to process stream task 1_2 due to the following error: java.lang.RuntimeExceptionNODE_3 2020-04-15 21:12:05.346 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] task [1_2] Restoring state store COMMAND_ID_STORE from changelog topic XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
      NODE_3 2020-04-15 21:12:05.562 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.StoreChangelogReader : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] Found checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for store COMMAND_ID_STORE.NODE_3 2020-04-15 21:12:06.424 WARN 1 --- [-StreamThread-1] c.g.f.c.s.validation.CommandIdValidator : Command duplicate detected. Command id mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc
      

      It seems that on NODE_3 there's a standby task 1_2 running on T-2, it replicates a first valid command, thus creating a checkpoint file. Invalid command causes an error on NODE_1, then NODE_3 T-2 takes over the task. It finds the checkpoint file (which is fine), and starts to process the invalid command. It crashes, same node T-1 takes over, finds the checkpoint file , thinks state store is clean (apparently it's not as it contains state modified by T-2) and finds a duplicated command id.

       

      We use Java 11, kafka clients 4.1 and spring-kafka 2.4.5. We rolled back for a moment to kafka clients 2.3.1 and the problem persists.

      We performed more tests with configuration changes and after changing `num.standby.replicas = 1` to `num.standby.replicas = 0` the problem disappeared. It is also resolved when changing the store to inMemoryWindowStore.

      In the SO question you can find the relevant java code. I don't have a sample project to share at the moment which replicates the problem, but it is easily repeatable in our project.

      Such behaviour can have serious implications on business logic, in our case accidentally skipped messages without properly processing them.

        Attachments

        1. failedtest3
          903 kB
          Mateusz Jadczyk
        2. failedtest3_bug
          2 kB
          Mateusz Jadczyk
        3. failedtest2
          857 kB
          Mateusz Jadczyk
        4. failedtest
          824 kB
          Mateusz Jadczyk
        5. state_store_operations.txt
          25 kB
          Mateusz Jadczyk
        6. tasks_assignment.txt
          8 kB
          Mateusz Jadczyk

          Activity

            People

            • Assignee:
              mjsax Matthias J. Sax
              Reporter:
              mateuszjadczyk Mateusz Jadczyk
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: