Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8315

Historical join issues

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 3.0.0
    • streams
    • None

    Description

      The problem we are experiencing is that we cannot reliably perform simple joins over pre-populated kafka topics. This seems more apparent where one topic has records at less frequent record timestamp intervals that the other.
      An example of the issue is provided in this repository :

      https://github.com/the4thamigo-uk/join-example

       
      The only way to increase the period of historically joined records is to increase the grace period for the join windows, and this has repercussions when you extend it to a large period e.g. 2 years of minute-by-minute records.

      Related slack conversations : 

      https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300

      https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900

       
      Research on this issue has gone through a few phases :

      1) This issue was initially thought to be due to the inability to set the retention period for a join window via Materialized: i.e.

      The documentation says to use `Materialized` not `JoinWindows.until()` (https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-), but there is no where to pass a `Materialized` instance to the join operation, only to the group operation is supported it seems.

      This was considered to be a problem with the documentation not with the API and is addressed in https://github.com/apache/kafka/pull/6664

      2) We then found an apparent issue in the code which would affect the partition that is selected to deliver the next record to the join. This would only be a problem for data that is out-of-order, and join-example uses data that is in order of timestamp in both topics. So this fix is thought not to affect join-example.

      This was considered to be an issue and is being addressed in https://github.com/apache/kafka/pull/6719

       3) Further investigation using a crafted unit test seems to show that the partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok

      https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b

      4) the current assumption is that the issue is rooted in the way records are consumed from the topics :

      We have tried to set various options to suppress reads form the source topics but it doesnt seem to make any difference : https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63

       

      Attachments

        1. code.java
          3 kB
          Andrew

        Issue Links

          Activity

            People

              vvcephei John Roesler
              the4thamigo_uk Andrew
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: