Description
Stream-stream joins seems to still emit spurious results for some window configurations.
From my tests, it happened when setting before to 0 and having a grace period smaller than the window duration. More precisely it seems to happen when setting before and
window duration > grace period + before
how to reproduce
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.time.Duration; import java.time.Instant; import java.util.Properties; public class SpuriousLeftJoinTest { static final Duration WINDOW_DURATION = Duration.ofMinutes(10); static final Duration GRACE = Duration.ofMinutes(6); static final Duration BEFORE = Duration.ZERO; static final String LEFT_TOPIC_NAME = "LEFT_TOPIC"; static final String RIGHT_TOPIC_NAME = "RIGHT_TOPIC"; static final String OUTPUT_TOPIC_NAME = "OUTPUT_TOPIC"; private static TopologyTestDriver testDriver; private static TestInputTopic<String, Integer> inputTopicLeft; private static TestInputTopic<String, Integer> inputTopicRight; private static TestOutputTopic<String, Integer> outputTopic; public static Topology createTopology() { StreamsBuilder builder = new StreamsBuilder(); KStream<String, Integer> leftStream = builder.stream(LEFT_TOPIC_NAME); KStream<String, Integer> rightStream = builder.stream(RIGHT_TOPIC_NAME); // return 1 if left join matched, otherwise 0 KStream<String, Integer> joined = leftStream.leftJoin( rightStream, (value1, value2) -> { if(value2 == null){ return 0; } return 1; }, JoinWindows.ofTimeDifferenceAndGrace(WINDOW_DURATION, GRACE) .before(BEFORE) ); joined.to(OUTPUT_TOPIC_NAME); return builder.build(); } @Before public void setup() { Topology topology = createTopology(); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); testDriver = new TopologyTestDriver(topology, props); inputTopicLeft = testDriver.createInputTopic(LEFT_TOPIC_NAME, Serdes.String().serializer(), Serdes.Integer().serializer()); inputTopicRight = testDriver.createInputTopic(RIGHT_TOPIC_NAME, Serdes.String().serializer(), Serdes.Integer().serializer()); outputTopic = testDriver.createOutputTopic(OUTPUT_TOPIC_NAME, Serdes.String().deserializer(), Serdes.Integer().deserializer()); } @After public void tearDown() { testDriver.close(); } @Test public void shouldEmitOnlyOneMessageForKey1(){ Instant now = Instant.now(); inputTopicLeft.pipeInput("key1", 12, now); inputTopicRight.pipeInput("key1", 13, now.plus(WINDOW_DURATION)); // send later record to increase stream time & close the window inputTopicLeft.pipeInput("other_key", 1212122, now.plus(WINDOW_DURATION).plus(GRACE).plusSeconds(10)); while (! outputTopic.isEmpty()){ System.out.println(outputTopic.readKeyValue()); } } }
Stdout of previous code is
KeyValue(key1, 0) KeyValue(key1, 1)
However it should be
KeyValue(key1, 1)
Attachments
Issue Links
- links to