Kafka
  1. Kafka
  2. KAFKA-4153

Incorrect KStream-KStream join behavior with asymmetric time window

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.0.1
    • Fix Version/s: 0.10.1.0
    • Component/s: streams
    • Labels:
      None
    • Flags:
      Patch

      Description

      Using Kafka 0.10.0.1, if joining records in two streams separated by some time, but only when records from one stream are newer than records from the other, i.e. doing:

      stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(10000))

      One would expect that the following would be equivalent:

      stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(10000))

      Alas, that this is not the case. Instead, this generates the same output as the first example:

      stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(10000))

      The problem is that the DefaultJoin implementation in KStreamImpl fails to reverse the before and after values when creates the KStreamKStreamJoin for the other stream, even though is calls reverseJoiner to reverse the joiner.

        Issue Links

          Activity

          Hide
          ASF GitHub Bot added a comment -

          GitHub user eliaslevy opened a pull request:

          https://github.com/apache/kafka/pull/1846

          KAFKA-4153: Fix incorrect KStream-KStream join behavior with asymmetric time window

          The contribution is my original work and I license the work to the project under the project's open source license.

          @guozhangwang

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/eliaslevy/kafka KAFKA-4153

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/kafka/pull/1846.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1846


          commit a82897f5e4933698c95de1a77ae6ae6f4c721743
          Author: Elias Levy <fearsome.lucidity@gmail.com>
          Date: 2016-09-12T22:27:34Z

          Swap before & after values for other KStreamKstreamJoin

          commit 05721ca926321e298fbd3481c96db176dfec9716
          Author: Elias Levy <fearsome.lucidity@gmail.com>
          Date: 2016-09-12T22:28:37Z

          Add tests for asymetric window stream-stream joins


          Show
          ASF GitHub Bot added a comment - GitHub user eliaslevy opened a pull request: https://github.com/apache/kafka/pull/1846 KAFKA-4153 : Fix incorrect KStream-KStream join behavior with asymmetric time window The contribution is my original work and I license the work to the project under the project's open source license. @guozhangwang You can merge this pull request into a Git repository by running: $ git pull https://github.com/eliaslevy/kafka KAFKA-4153 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1846.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1846 commit a82897f5e4933698c95de1a77ae6ae6f4c721743 Author: Elias Levy <fearsome.lucidity@gmail.com> Date: 2016-09-12T22:27:34Z Swap before & after values for other KStreamKstreamJoin commit 05721ca926321e298fbd3481c96db176dfec9716 Author: Elias Levy <fearsome.lucidity@gmail.com> Date: 2016-09-12T22:28:37Z Add tests for asymetric window stream-stream joins
          Hide
          Elias Levy added a comment -

          As a side note, the API for JoinWindows has become mangled since 0.10.0.1. Now if you want to instantiate an asymetric JoinWindows you must first call JoinWindows.of(0) before you can call before or after to set to actual window boundaries. The class should provide alternative static method for both before and after to instantiate the class without having to first call of.

          Show
          Elias Levy added a comment - As a side note, the API for JoinWindows has become mangled since 0.10.0.1. Now if you want to instantiate an asymetric JoinWindows you must first call JoinWindows.of(0) before you can call before or after to set to actual window boundaries. The class should provide alternative static method for both before and after to instantiate the class without having to first call of .
          Hide
          Matthias J. Sax added a comment -

          Well, you can just call JoinWindows.of(before).after(after) (or reverse). However, it might be good to add JoinWindows#of(long before, long after).

          Show
          Matthias J. Sax added a comment - Well, you can just call JoinWindows.of(before).after(after) (or reverse). However, it might be good to add JoinWindows#of(long before, long after) .
          Hide
          Elias Levy added a comment -

          You could do that, but it would be non-obvious to someone reading the code what the semantics of that window should be.

          I would prefer seeing before and after have static alternatives, so you could write JoinWindows.before(100).after(50) or similar.

          Show
          Elias Levy added a comment - You could do that, but it would be non-obvious to someone reading the code what the semantics of that window should be. I would prefer seeing before and after have static alternatives, so you could write JoinWindows.before(100).after(50) or similar.
          Hide
          Matthias J. Sax added a comment -

          IMHO JoinWindows.before(100).after(50) would not be a good choice – the user could just call only JoinWindows.before(100) creating a window with undefined value for after. For me, the API should ensure, that defining and invalid window is not possible (instead of check for this condition and throw an exception later on).

          Show
          Matthias J. Sax added a comment - IMHO JoinWindows.before(100).after(50) would not be a good choice – the user could just call only JoinWindows.before(100) creating a window with undefined value for after. For me, the API should ensure, that defining and invalid window is not possible (instead of check for this condition and throw an exception later on).
          Hide
          Elias Levy added a comment -

          Why would the other value be undefined? Default it to zero.

          Show
          Elias Levy added a comment - Why would the other value be undefined? Default it to zero.
          Hide
          Guozhang Wang added a comment -

          According to the stated Java doc string, JoinWindows definition should be reversed as it states that, for example in before function, if the timestamp of a record from the secondary stream is earlier than or equal to the timestamp of a record from the first stream. It is indeed a bug that should be fixed, we can review the PR and merge it.

          As for defining asymmetric JoinWindows, I think it is better to add to the current before after functions with static constructors, as it is for defining the "length" of the window as final values that should not be overridden later.

          Show
          Guozhang Wang added a comment - According to the stated Java doc string, JoinWindows definition should be reversed as it states that, for example in before function, if the timestamp of a record from the secondary stream is earlier than or equal to the timestamp of a record from the first stream . It is indeed a bug that should be fixed, we can review the PR and merge it. As for defining asymmetric JoinWindows , I think it is better to add to the current before after functions with static constructors, as it is for defining the "length" of the window as final values that should not be overridden later.
          Hide
          Elias Levy added a comment -

          I've updated the PR to reverse the before & after semantics as you've pointed out.

          Show
          Elias Levy added a comment - I've updated the PR to reverse the before & after semantics as you've pointed out.
          Hide
          Guozhang Wang added a comment -

          Issue resolved by pull request 1846
          https://github.com/apache/kafka/pull/1846

          Show
          Guozhang Wang added a comment - Issue resolved by pull request 1846 https://github.com/apache/kafka/pull/1846
          Hide
          ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/kafka/pull/1846

          Show
          ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/1846
          Hide
          Guozhang Wang added a comment -

          Thanks Elias Levy, I have added you to the contributor list so you can assign tickets to yourself moving forward.

          Show
          Guozhang Wang added a comment - Thanks Elias Levy , I have added you to the contributor list so you can assign tickets to yourself moving forward.

            People

            • Assignee:
              Elias Levy
              Reporter:
              Elias Levy
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development