Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-3337

KafkaTridentSpoutOpaque can lose offset data in Zookeeper

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.2.3
    • None
    • None

    Description

      I see this issue happening once a twice a week for a number of topologies I have running in production that are reading from Kafka. I was able to reproduce it in a more pared down environment using 1 worker with a parallelism hint of at least 2 reading from a topic that had 16 partitions. The issue is reproducible using less partitions but it occurs less frequently.

      What happens is that while committing offsets for the first transaction after a worker crash the partition offset data in Zookeeper is wiped for a subset of that worker's partitions. The state is restored after the next batch or two is committed and the worker continues as normal, however if the worker happens to crash a 2nd time before the data restores itself then it gets lost and those partitions reset (in my case to their earliest offsets since I used a reset strategy of UNCOMMITTED_EARLIEST) after the worker restarts again.

      I've attached a log file showing what's happening. In this example ZK had offset data committed for all partitions for txid=29 before the worker crashed. After the worker came back up partitions 1, 3, 5, 7, 9, 11, 13, 15 lose their child nodes in ZK, the remaining partitions get child nodes created for txid=30. The important steps are:

      1. Thread-7 and Thread-19 both hit 'Emitted Batch' for txid=30 here:

      https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L146
      Thread-7 is assigned even numbered partitions 0-14, has _index=0, _changedMeta=true, coordinatorMeta=[]
      Thread-19 is assigned odd numbered partitions 1-15, has _index=1, _changedMeta=true, coordinatorMeta=[]
      Even though `coordinatorMeta` is empty the `KafkaTridentSpoutEmitter` ignores this parameter in `getPartitionsForTask` and returns partition assignments for each thread:
      https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L253

      2. Both threads hit 'Committing transaction' here:
      https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L160

      3. Thread-19 begins create state for txid=30 for its partitions:
      https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L186

      4. Thread-7 enters this special condition since it has `_index==0` and `_changedMeta==true`
      https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L169

      5. Thread-7 calls ` _emitter.getOrderedPartitions(_savedCoordinatorMeta)`, which for the KakfaTridentSpoutEmitter implementation returns an empty list since coordinatorMeta was empty for this batch:
      https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L241

      6. Thread-7 does a list for all the partition nodes for this component in ZK and since `validIds` is empty they all pass the check and `removeState` is called on each of them for txid=30:
      https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L177
      This is the key part that causes the bug. Thread-7 hasn't started committing state for txid=30 for any of its assigned partitions so the calls to `removeState` on those partitions won't do anything. However Thread-19 is running concurrently so any partitions it has already written state for will get deleted here.

      7. Thread-7 and Thread-19 enter the success handler and call `cleanupBefore(txid=29)`:
      https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L153
      For the partitions owned by Thread-19 that got their state removed in #6 that means that both the nodes for 29 and 30 are deleted and the partitions' sates are empty in ZK.

      Couple things that are contributing to this bug. I'm not sure if it's expected that `coordinatorMeta` be passed into `emitBatch` as an empty list for the first batch after the worker restart. If this contained all the partitions assigned across all tasks (which it does on subsequent batches) then `validIds` wouldn't trigger for partitions that were owned by other tasks and their state wouldn't accidentally get removed. This is exacerbated by the fact that the `KafkaTridentSpoutEmitter` returns valid partitions for `getPartitionsForTask` even when `getOrderedPartitions` returns empty which seems to break expectations.

      One additional safe-guard that might be worth investigation is having `cleanupBefore` make sure it wasn't going to leave a node in ZK without any children before running.

      Attachments

        1. lost_offsets.log
          52 kB
          Jonathan Munz

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Jonathan Munz Jonathan Munz
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: