Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7595

Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Bug
    • 2.0.0
    • None
    • streams
    • None

    Description

      When perform KTable to KTable join after aggregation, there are duplicates in resulted KTable.

      1. caching disabled, no materialized => duplicates

      streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

      KTable<Long, Long> ratingCounts = ratingsById.count();
      KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);

      KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,
      {{ (sum, count) -> sum / count.doubleValue());}}

      2. caching disabled, materialized => duplicate

      streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);KTable<Long, Long> ratingCounts = ratingsById.count();
      KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);

      KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,
      {{ (sum, count) -> sum / count.doubleValue(),}}
      {{ Materialized.as("average-ratings"));}}

      3. caching enabled, materiazlized => all good

      // Enable record cache of size 10 MB.
      streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
      // Set commit interval to 1 second.
      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);KTable<Long, Long> ratingCounts = ratingsById.count();
      KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);

      KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,
      {{ (sum, count) -> sum / count.doubleValue(),}}
      {{ Materialized.as("average-ratings"));}}

       

      Demo app https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107 

      Attachments

        Activity

          People

            Unassigned Unassigned
            vikgamov Vik Gamov
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: