Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13721

Left-join still emit spurious results in stream-stream joins in some cases

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.0
    • 3.2.0, 3.1.1
    • streams
    • None

    Description

      Stream-stream joins seems to still emit spurious results for some window configurations.

      From my tests, it happened when setting before to 0 and having a grace period smaller than the window duration. More precisely it seems to happen when setting before and
      window duration > grace period + before

      how to reproduce

      import org.apache.kafka.common.serialization.Serdes;
      import org.apache.kafka.streams.StreamsBuilder;
      import org.apache.kafka.streams.StreamsConfig;
      import org.apache.kafka.streams.TestInputTopic;
      import org.apache.kafka.streams.TestOutputTopic;
      import org.apache.kafka.streams.Topology;
      import org.apache.kafka.streams.TopologyTestDriver;
      import org.apache.kafka.streams.kstream.JoinWindows;
      import org.apache.kafka.streams.kstream.KStream;
      import org.junit.After;
      import org.junit.Before;
      import org.junit.Test;
      
      import java.time.Duration;
      import java.time.Instant;
      import java.util.Properties;
      
      public class SpuriousLeftJoinTest {
      
          static final Duration WINDOW_DURATION = Duration.ofMinutes(10);
          static final Duration GRACE = Duration.ofMinutes(6);
          static final Duration BEFORE = Duration.ZERO;
          static final String LEFT_TOPIC_NAME = "LEFT_TOPIC";
          static final String RIGHT_TOPIC_NAME = "RIGHT_TOPIC";
          static final String OUTPUT_TOPIC_NAME = "OUTPUT_TOPIC";
      
      
          private static TopologyTestDriver testDriver;
          private static TestInputTopic<String, Integer> inputTopicLeft;
          private static TestInputTopic<String, Integer> inputTopicRight;
          private static TestOutputTopic<String, Integer> outputTopic;
      
          public static Topology createTopology() {
      
              StreamsBuilder builder = new StreamsBuilder();
      
              KStream<String, Integer> leftStream = builder.stream(LEFT_TOPIC_NAME);
              KStream<String, Integer> rightStream = builder.stream(RIGHT_TOPIC_NAME);
      
              // return 1 if left join matched, otherwise 0
              KStream<String, Integer> joined = leftStream.leftJoin(
                  rightStream,
                  (value1, value2) -> {
                      if(value2 == null){
                          return 0;
                      }
                      return 1;
                  },
                  JoinWindows.ofTimeDifferenceAndGrace(WINDOW_DURATION, GRACE)
                      .before(BEFORE)
              );
      
              joined.to(OUTPUT_TOPIC_NAME);
      
              return builder.build();
          }
      
      
          @Before
          public void setup() {
      
              Topology topology = createTopology();
      
              Properties props = new Properties();
              props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
              props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
              props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
              props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
      
              testDriver = new TopologyTestDriver(topology, props);
      
              inputTopicLeft = testDriver.createInputTopic(LEFT_TOPIC_NAME, Serdes.String().serializer(), Serdes.Integer().serializer());
              inputTopicRight = testDriver.createInputTopic(RIGHT_TOPIC_NAME, Serdes.String().serializer(), Serdes.Integer().serializer());
      
              outputTopic = testDriver.createOutputTopic(OUTPUT_TOPIC_NAME, Serdes.String().deserializer(), Serdes.Integer().deserializer());
      
          }
      
          @After
          public void tearDown() {
              testDriver.close();
          }
      
          @Test
          public void shouldEmitOnlyOneMessageForKey1(){
              Instant now = Instant.now();
              inputTopicLeft.pipeInput("key1", 12, now);
              inputTopicRight.pipeInput("key1", 13, now.plus(WINDOW_DURATION));
      
              // send later record to increase stream time & close the window
              inputTopicLeft.pipeInput("other_key", 1212122, now.plus(WINDOW_DURATION).plus(GRACE).plusSeconds(10));
      
              while (! outputTopic.isEmpty()){
                  System.out.println(outputTopic.readKeyValue());
              }
          }
      
      
      }
      

      Stdout of previous code is

      KeyValue(key1, 0)
      KeyValue(key1, 1)
      

      However it should be

      KeyValue(key1, 1)
      

      Attachments

        Issue Links

          Activity

            People

              mjsax Matthias J. Sax
              Nollain Nollet
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: