Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16394

ForeignKey LEFT join propagates null value on foreignKey change

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Patch Available
    • Major
    • Resolution: Unresolved
    • 3.7.0
    • None
    • streams
    • None

    Description

      We have two topics : left-topic[String, LeftRecord] and right-topic[String, String]

      where LeftRecord :

       case class LeftRecord(foreignKey: String, name: String)

      we do a simple LEFT foreign key join on left-topic's foreignKey field. The resulting join value is the value in right-topic.

       

      Scenario1: change foreignKey

      Input the following

      rightTopic.pipeInput("fk1", "1")
      rightTopic.pipeInput("fk2", "2") 
      
      leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
      leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
      

       

      Expected result

      KeyValue(pk1, 1)
      KeyValue(pk1, 2)

       

      Actual result

      KeyValue(pk1, 1)
      KeyValue(pk1, null)
      KeyValue(pk1, 2)

       

      A null is propagated to the join result when the foreign key changes

       

      Scenario 2: Delete PrimaryKey

      Input

      rightTopic.pipeInput("fk1", "1")
      rightTopic.pipeInput("fk2", "2")
      
      leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
      leftTopic.pipeInput("pk1", null) 

       

      Expected result

      KeyValue(pk1, 1)
      KeyValue(pk1, null) 

       

      Actual result

      KeyValue(pk1, 1)
      KeyValue(pk1, null)
      KeyValue(pk1, null) 

      An additional null is propagated to the join result.

       

      This bug doesn't exist on versions 3.6.0 and below.

       

      I believe the issue comes from the line https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134

      where we propagate the deletion in the two scenarios above

       

      Attaching the topology I used.

      Attachments

        1. ForeignJoinTest.scala
          2 kB
          Ayoub Omari
        2. JsonSerde.scala
          1 kB
          Ayoub Omari

        Issue Links

          Activity

            People

              ayoubomari Ayoub Omari
              ayoubomari Ayoub Omari
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated: