We have an application that contains a broadcast process stage. The none-broadcast input has roughly 10 million messages per second, and the broadcast side is some kind of control stream, rarely has message follow through.
After several hours of running, the TaskManager will run out of heap memory and restart. We reviewed the application code without finding any relevant issues.
We found that the running to crash time was roughly the same. Then we make a heap dump before the crash and found mass `CompletableFuture$UniRun` instances.
These `CompletableFuture$UniRun` instances consume several gigabytes memories.
The following pic is from the heap dump we get from a mock testing stream with the same scenario.
After some source code research. We found that it might be caused by the StreamMultipleInputProcessor.getAvailableFuture().
StreamMultipleInputProcessor has multiple inputProcessors , it's availableFuture got completed when any of it's input's availableFuture is complete.
The current implementation create a new CompletableFuture and a new CompletableFuture$UniRun append to delegate inputProcessor's avaiableFuture.
The issue is caused by the stacking of CompletableFuture$UniRun on the slow inputProcessor's avaiableFuture.
See the source code below.
Because the UniRun holds the reference of outside StreamMultipleInputProcessor's avaiableFuture, that cause mass CompletableFuture instance which can not be recycled.
We made some modifications to the StreamMultipleInputProcessor.getAvaiableFuture function, and verify that the issue is gone on our modified version.
We are willing to make a PR for this fix.
Heap Dump File flink-completablefuture-issue.tar.xz
PS: This is a YourKit heap dump. may be not compatible HPROF files.