Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Workaround
-
1.19.0
-
None
Description
The reported issue is easy to reproduce in batch mode using hybrid shuffle and a somewhat large total number of slots in the cluster. Low parallelism (60) still triggers it.
Note: Joined a partial threaddump to illustrate the issue.
When `NetworkBufferPool.internalRecycleMemorySegments` is called. The following chain of call may happen:
NetworkBufferPool.internalRecycleMemorySegments -> LocalBufferPool.onGlobalPoolAvailable -> LocalBufferPool.checkAndUpdateAvailability -> LocalBufferPool.requestMemorySegmentFromGlobalWhenAvailable
Several instances of `LocalBufferPool` will be notified as soon as a segment becomes available in the global pool because of the implementation of `requestMemorySegmentFromGlobalWhenAvailable`:
networkBufferPool.getAvailableFuture().thenRun(this::onGlobalPoolAvailable));
The issue arises when 2 or more threads go through this specific code path at the same time.
Each thread will notify the same instances of `LocalBufferPool` by invoking `onGlobalPoolAvailable` on each of them and in the process try to acquire a series of locks
As an example, assume there are 6 `LocalBufferPool` instance A, B, C, D, E and F:
Thread 1 calls `onGlobalPoolAvailable` on A, B, C and D. it locks A, B, C and tries to lock D
Thread 2 calls `onGlobalPoolAvailable` on D, E, F, and A. It locks D, E, F and tries to lock A
==> Threads 1 and 2 are mutually blocked.
The example threadump captured this issue:
First thread locked java.util.ArrayDeque@41d6a3bb and is blocked on java.util.ArrayDeque@e2b5e34
Second thread locked java.util.ArrayDeque@e2b5e34 and is blocked on java.util.ArrayDeque@41d6a3bb
Note that I'm not familiar enough with Flink internals to know what the fix should be but I'm happy to submit a PR if someone tells me what the correct behaviour should be.