Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9503

TopologyTestDriver processes intermediate results in the wrong order

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments


    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 2.5.0, 2.4.1
    • streams-test-utils
    • None


      TopologyTestDriver has the feature that it processes each input synchronously, resolving one of the most significant challenges with verifying the correctness of streaming applications.

      When processing an input, it feeds that record to the source node, which then synchronously (it's always synchronous within a task) gets passed through the subtopology via Context#forward calls. Ultimately, outputs from that input are forwarded into the RecordCollector, which converts it to Producer.send calls. In TopologyTestDriver, this Producer is a special one that actually just captures the records.

      Some output topics from one subtopology are inputs to another subtopology. For example, repartition topics. Immediately after the synchronous subtopology process() invocation, TopologyTestDriver iterates over the collected outputs from the special Producer. If they are purely output records, it just enqueues them for later retrieval by testing code. If they are records for internal topics, though, TopologyTestDriver immediately processes them as inputs for the relevant subtopology.

      The problem, and this is very subtle, is that TopologyTestDriver does this recursively, which with some (apparently rare) programs can cause the output to be observed in an invalid order.

      One such program is the one I wrote to test the fix for KAFKA-9487 . It involves a foreign-key join whose result is joined back to one of its inputs.

      Here's a simplified version:
      // foreign key join
      J = A.join(B, (extractor) a -> a.b, (joiner) (a,b) -> new Pair(a, b))
      // equi-join
      OUT = A.join(J, (joiner) (a, j) -> new Pair(a, j))
      Let's say we have the following initial condition:
      a1 = {v: X, b: b1}
      b1 = {v: Y}
      a1 = Pair({v: X}, b: b1}, {v: Y})
      a1 = Pair({v: X}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
      Now, piping an update:
      a1: {v: Z, b: b1}
      results immediately in two buffered results in the Producer:
      (FK join subscription): b1: {a1}
      (OUT): a1 = Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
      Note that the FK join result isn't updated synchronously, since it's an async operation, so the RHS lookup is temporarily incorrect, yielding the nonsense intermediate result where the outer pair has the updated value for a1, but the inner (fk result) one still has the old value for a1.
      However! We don't buffer that output record for consumption by testing code yet, we leave it in the internal Producer while we process the first intermediate record (the FK subscription).
      Processing that internal record means that we have a new internal record to process:
      (FK join subscription response): a1: {b1: {v: Y}}
      so right now, our internal-records-to-process stack looks like:
      (FK join subscription response): a1: {b1: {v: Y}}
      (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
      Again, we start by processing the first thing, the FK join response, which results in an updated FK join result:
      (J) a1: Pair({v: Z}, b: b1}, {v: Y})
      and output:
      (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
      and, we still haven't handled the earlier output, so now our internal-records-to-process stack looks like:
      (J) a1: Pair({v: Z}, b: b1}, {v: Y})
      (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
      (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
      At this point, there's nothing else to process in internal topics, so we just copy the records one by one to the "output" collection for later handling by testing code, but this yields the wrong final state of:
      (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
      That was an incorrect intermediate result, but because we're processing internal records recursively (as a stack), it winds up emitted at the end instead of in the middle.
      If we change the processing model from a stack to a queue, the correct order is preserved, and the final state is:
      (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))

      This is what I did in https://github.com/apache/kafka/pull/8015


        Issue Links


          This comment will be Viewable by All Users Viewable by All Users


            vvcephei John Roesler
            vvcephei John Roesler
            0 Vote for this issue
            4 Start watching this issue




                Issue deployment