Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
3.1.3, 2.10.1, 3.2.4, 3.3.4
-
None
-
None
Description
Run flink job on YARN 2.10.1 using the capacity scheduler,used resource of user is incorrect when job manager failed and attempt.
Reproduce this issue:
1. Create a capacity_test queue. The queue resource is following:
Queue State: RUNNING Used Capacity: <memory:4096, vCores:4> (84.7%) Configured Capacity: <memory:0, vCores:0> Configured Max Capacity: unlimited Effective Capacity: <memory:20479, vCores:4> (4.0%) Effective Max Capacity: <memory:512000, vCores:118> (100.0%) Absolute Used Capacity: 3.4% Absolute Configured Capacity: 4.0% Absolute Configured Max Capacity: 100.0% Used Resources: <memory:4096, vCores:4>
2. Sumbit a flink job to yarn with parallelism is 10 and contaianer resource is 1c 1024m.
flink run -m yarn-cluster -yjm 1024 -ytm 1024 -parallelism 10 -yqu capacity_test /cloud/service/flink/examples/streaming/WindowJoin.ja
Becuase user's max resource of this queue is 4c, 10g, so this job only can runnning 5 containers, at this moment, used resource of this user is following
User Name | Max Resource | Weight | Used Resource | Max AM Resource | Used AM Resource | Schedulable Apps | Non-Schedulable Apps |
---|---|---|---|---|---|---|---|
hadoop | <memory:20480, vCores:4> | 1.0 | <memory:5120, vCores:5> | <memory:10240, vCores:2> | <memory:2048, vCores:2> | 2 |
3. kill -9 the process of job manager, so this application of attempt will be removed by yarn, and the user will be remove form UserManager as well.
In method of LeafQueue#removeApplicationAttempt, when user's total applications is 0, the user will be remove from usersManager.
private void removeApplicationAttempt( FiCaSchedulerApp application, String userName) { try { writeLock.lock(); //... user.finishApplication(wasActive); if (user.getTotalApplications() == 0) { usersManager.removeUser(application.getUser()); } //... }
4. A new job manager will be attempted , so the User object of hadoop will be recreate, and used resource of user is initialize to 0. As the same time, in flink job, the value of ApplicationSubmissionContextProto#keep_containers_across_application_attempts is true, old containers can still running, and this part of resources is not compute in recreated user. So used resource of user is incorrect and real used resource more than max resource,like this