Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9244

Update of old FK reference on RHS should not trigger join result

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.4.0
    • Fix Version/s: 2.4.0
    • Component/s: streams
    • Labels:
      None

      Description

      Perform a KTable-KTable foreign key join, after changing LHS FK reference from FK1 -> FK2, populating update on RHS with FK1 should not produce join result.

      Below test case failed at the point of last "is(emptyMap())", after we published data on RHS with previous associated foreign key.

       

          @Test
          public void doJoinFromLeftThenChangeForeignKeyMappingOfLeft() {
              final Topology topology = getTopology(streamsConfig, "store", leftJoin);
              try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
                  final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
                  final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
                  final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
                  final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
      
                  // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference
                  // then populate update on RHS
                  right.pipeInput("rhs1", "rhsValue1");
                  right.pipeInput("rhs2", "rhsValue2");
      
                  assertThat(
                          outputTopic.readKeyValuesToMap(),
                          is(emptyMap())
                  );
                  assertThat(
                          asMap(store),
                          is(emptyMap())
                  );
      
                  left.pipeInput("lhs1", "lhsValue1|rhs1");
      
                  {
                      final Map<String, String> expected = mkMap(
                              mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
                      );
                      assertThat(
                              outputTopic.readKeyValuesToMap(),
                              is(expected)
                      );
                      assertThat(
                              asMap(store),
                              is(expected)
                      );
                  }
      
                  // Change LHS foreign key reference
                  left.pipeInput("lhs1", "lhsValue1|rhs2");
                  {
                      final Map<String, String> expected = mkMap(
                              mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
                      );
                      assertThat(
                              outputTopic.readKeyValuesToMap(),
                              is(expected)
                      );
                      assertThat(
                              asMap(store),
                              is(expected)
                      );
                  }
      
                  // Populate RHS update on old LHS foreign key ref
                  right.pipeInput("rhs1", "rhsValue1Delta");
                  {
                      assertThat(
                              outputTopic.readKeyValuesToMap(),
                              is(emptyMap())
                      );
                      assertThat(
                              asMap(store),
                              is(mkMap(
                                      mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
                              ))
                      );
                  }
              }
          }
      

        Attachments

          Activity

            People

            • Assignee:
              mjsax Matthias J. Sax
              Reporter:
              nikuis Kin Siu

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment