Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23833

Cache of ShuffleDescriptors should be individually cleaned up

    XMLWordPrintableJSON

Details

    Description

      In FLINK-23005, we introduce the cache of compressed serialized value for ShuffleDescriptors to improve the performance of deployment. To make sure the cache wouldn't stay too long and become a burden for GC, the cache would be cleaned up when the partition is released or reset for new execution. In the implementation, the cache of the entire IntermediateResult is cleaned up because a partition is released only when the entire IntermediateResult is released.

      However, after FLINK-22017, the BLOCKING result partition is allowed to be consumable individually. It also means that the result partition doesn't need to wait for other result partitions and can be released individually. After this change, there may be a scene: when a result partition is finished, the cache of IntermediateResult on the blob is deleted, while other result partitions corresponding to this IntermediateResult is just deployed to the TaskExecutor. Then when TaskExecutors are trying to download TDD from the blob, they will find the blob is deleted and get stuck.

      This bug only happens for jobs with POINTWISE BLOCKING edge. Also, the blob.offload.minsize is set to be a extremely small value, since the size of  ShuffleDescriptors of POINTWISE BLOCKING edges is usually small. To solve this issue, we just need to clean up the cache of ShuffleDescriptors individually.

      Attachments

        Issue Links

          Activity

            People

              Thesharing Zhilong Hong
              Thesharing Zhilong Hong
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: