Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19151

Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used





      It's a Yarn protocol that the requested container resource will be normalized for allocation. That means, the allocated container may have different resource (larger than or equal to) compared to what is requested.

      Currently, Flink matches the allocated containers to the original requests by reading the Yarn configurations and calculate how the requested resources should be normalized.

      What has been overlooked is that, Yarn FairScheduler (and its subclass SLSFairScheduler) has overridden the normalization behavior. To be specific,

      • By default, Yarn normalize container resources to integer multiple of "yarn.scheduler.minimum-allocation-[mb|vcores]"
      • FairScheduler normalize container resources to integer multiple of "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while making sure the resource is no less than "yarn.scheduler.minimum-allocation-[mb|vcores]"

      Proposal for short term solution

      To fix this problem, a quick and easy way is to also read Yarn configuration and learn which scheduler is used, and perform normalization calculations accordingly. This should be good enough to cover behaviors of all the schedulers that Yarn currently provides. The limitation is that, Flink will not be able to deal with custom Yarn schedulers which override the normalization behaviors.

      Proposal for long term solution

      For long term, it would be good to use Yarn ContainerRequest#allocationRequestId to match the allocated containers with the original requests, so that Flink no longer needs to understand how Yarn normalize container resources. 

      Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution would not work at the moment.

      Another idea is to support various Hadoop versions with different container matching logics. We can abstract the container matching logics into a dedicating component, and provide different implementations for it. This will allow Flink to take advantages of the new versions (e.g., work well with custom schedulers), while stay compatible with the old versions without those advantages.

      Given that we need the resource based matching anyway for the old Hadoop versions, and the cost for maintaining two sets of matching logics, I tend to think this approach as a back-up option to be worked on when we indeed see a need for it.


          Issue Links



              • Assignee:
                csbliss jinhai
                xintongsong Xintong Song
              • Votes:
                0 Vote for this issue
                9 Start watching this issue


                • Created: