Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14589

Redundant slot requests with the same AllocationID leads to inconsistent slot table




      NOTE: We found this issue in 1.6.2, but I checked the relevant code is still in the mainline. What I am not sure, however, is what other slot-related fixes after 1.6.2 (such as FLINK-11059 and FLINK-12863, etc) would prevent the initial cause of this issue from happening. So far, I have not found the related fix to the issue I am describing here, so opening this issue. Please feel free to deduplicate this if another one already covers it. Please note that we have already picked FLINK-9912, which turned out to be a major fix to slot allocation failure issue. I will note the ramification to that issue just in case others experience the same problem).


      When requestSlot is called from ResourceManager (RM) to TaskManager (TM), TM firstly reserves the requested slot marking it as ALLOCATED, offers the slot to JM, and marks the slot as ACTIVE once getting acknowledgement from JM. This three-way communication for slot allocation is identified by AllocationID, which is generated by JM initially. The way TM reserves a slot is by calling TaskSlotTable.allocateSlot if the requested slot number (i.e., slot index) is free to use. The major data structure is TaskSlot indexed by slot index. Once the slot is marked as ALLOCATED with a given AllocationID, it tries to update other maps such as allocationIDTaskSlotMap keyed by AllocationID and slotsPerJob keyed by JobID. Here when updating allocationIDTaskSlotMap, it's directly using allocationIDTaskSlotMap.put(allocationId, taskSlot), which may overwrite existing entry, if one is already there with the same AllocationID. This would render inconsistency between TaskSlot and allocationIDTaskSlotMap, where the former says two slots are allocated by the same AllocationID and the latter says the AllocationID only has the latest task slot. With this state, once the slot is freed, freeSlot is driven by AllocationID, so it fetches slot index (i.e., the latter one that has arrived later) from allocationIDTaskSlotMap, marks the slot free, and removes it from allocationIDTaskSlotMap. But still the old task slot is marked as allocated. This old task slot becomes zombie and can never be freed. This can cause permanent slot allocation failure if TM slots are statically and tightly provisioned and resource manager is not actively spawning new TMs where unavailable (e.g., Kubernetes without active mode integration, which is not yet available).


      From my observation, the redundant slot requests with the same AllocationID and different slot indices should be rare but can happen with race condition especially when repeated fail-over and heartbeat timeout (primarily caused by transient resource overload, not permanent network partition/node outage) are taking place. The following is a detailed scenario, which could lead to this issue (AID is AllocationID):

      1. AID1 is requested from JM and put in the pending request queue in RM.
      2. RM picks up slot number 1 (Slot1) from freeSlots and performs requestSlot with Slot1 and AID1. Here this slot request is on the fly.
      3. In the meantime, Slot1 is occupied by AID2 in TM for a delayed slot request and TM sends slot report via heartbeat to RM saying Slot1 is already allocated with AID2.
      4. RM's heartbeat handler identifies that Slot1 is occupied with a different AID (AID2) so that it should reject the pending request sent from step 2.
      5. handleFailedSlotRequest puts the rejected AID1 to pending request again by retrying the slot request. Now it picks up another available slot, say Slot2. So, the retried slot request with Slot 2 and AID1 is on the fly.
      6. In the meantime, Slot1 occupied by AID2 is freed (by any disconnection with JM, or releasing all the tasks in the slot on cancellation/failure - the latter was observed).
      7. The in-flight slot request (Slot1, AID1) from step 2 arrives at TM, and it's succeeded as Slot1 is free to allocate. TM offers the Slot1 to JM, which acknowledges it so that TM marks Slot1 ACTIVE with AID1. As this point, allocationIDTaskSlotMap[AID1] = Slot1 in TM. JM's allocatedSlots[AID1] = Slot1.
      8. The next in-flight slot request (Slot2, AID1) from step 5 arrives at TM. As Slot2 is still free, TM marks it ALLOCATED and offers Slot2 to JM and "overwrite allocationIDTaskSlotMap[AID1] to Slot2"
      9. In step 7, JM has allocatedSlots[AID1] = Slot1, which leads JM to reject the offer as the same AID is already occupied by another slot.
      10. TM gets the rejected offer for (Slot2, AID1) and frees Slot2. As part of that, it removes allocationIDTaskSlotMap[AID1]. Here Slot1 is still marked as ALLOCATED with AID1 but allocationIDTaskSlotMap contains nothing for AID1.
      11. From this point on, RM believes that Slot1 is allocated for AID1, so is JM, proceeding task deployment with AID1. In TM, AID1 is not allocated at all due to allocationIDTaskSlotMap[AID1] = null. Task deployment is failed with TaskSubmissionException("No task slot allocated for job ID").
      12. Any slot release from JM (by another heartbeat timeout) removes the allocated slot (Slot1, AID1) from allocatedSlots and availableSlots, where freeing slots with AID1 in TM is no-op due to allocationIDTaskSlotMap[AID1] = null.
      13. Any further scheduling is failed with NoResourceAvailableException("Could not allocate all requires slots within timeout"), unless active resource manager is used.

      Repro method

      The repro I have is a little stressed way than deterministic one, by having constantly failing app (also with not reacting to cancellation to prolong fail-over), with short heartbeat timeout (<=1s) to emulate resource overload without imposing arbitrary resource overloads (I enabled DEBUG log, which could also add a bit more load). Apart from repro test, the real applications that run into this issue are heavily loaded (high slot oversubscription) doing continuous fail-overs (by code error).


      I've tested the simple solution like having TaskSlotTable.allocateSlot check if allocationIDTaskSlotMap has an existing task slot entry for a requested AID, it can reject allocation up front, so that the redundant slot request can fail fast not reaching reservation and slot offer, preventing allocationIDTaskSlotMap overwrite and inconsistent allocation tables. In the above example, the fix can replace step 8, 9, and 10 with RM getting SlotAllocationException on handling failed slot request. It may retry the pending request via handleFailedSlotRequest, but it would stop retrying once next heartbeat saying the pending slot was allocated by another AID or eventually slot request timeout. My test with the fix has run for a while hitting this race case multiple times and survived without slot allocation failure.

      As my experience may be in a bit old version (1.6.2), It would be good to discuss what other unexpected things happened and what would be other related issues that have been incorporated in mainline.


        Issue Links



              hwanju Hwanju Kim
              hwanju Hwanju Kim
              0 Vote for this issue
              9 Start watching this issue



                Time Tracking

                  Original Estimate - Not Specified
                  Not Specified
                  Remaining Estimate - 0h
                  Time Spent - 20m