Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
None
-
None
Description
In current ThreadSafeUnaligner, the variable `storeNewBuffers` has actually two usages. One is for avoiding duplicated counter for processing barrier or notifying received barrier for a respective channel. Another usage is for judging whether the notified buffer should be spilled by writer or not.
In RemoteInputChannel, we already have the states of `lastRequestedCheckpointId` and `receivedCheckpointId` to control whether the received buffer should be notified to unaligner component. In other words, as long as the `RemoteInputChannel` decides to notify this received buffer, it should be always needed to spill in ThreadSafeUnaligner. So we can remove the related condition inside ThreadSafeUnaligner#notifyBufferReceived and make the semantic of `storeNewBuffers` more clearly.
Attachments
Issue Links
- is fixed by
-
FLINK-19026 Improve threading model of Unaligner
- Resolved