Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Bug
-
2.0.0
-
None
-
None
Description
When perform KTable to KTable join after aggregation, there are duplicates in resulted KTable.
1. caching disabled, no materialized => duplicates
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
KTable<Long, Long> ratingCounts = ratingsById.count();
KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);
KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,
{{ (sum, count) -> sum / count.doubleValue());}}
2. caching disabled, materialized => duplicate
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);KTable<Long, Long> ratingCounts = ratingsById.count();
KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);
KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,
{{ (sum, count) -> sum / count.doubleValue(),}}
{{ Materialized.as("average-ratings"));}}
3. caching enabled, materiazlized => all good
// Enable record cache of size 10 MB.
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
// Set commit interval to 1 second.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);KTable<Long, Long> ratingCounts = ratingsById.count();
KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);
KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,
{{ (sum, count) -> sum / count.doubleValue(),}}
{{ Materialized.as("average-ratings"));}}