Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
None
Description
MiniClusterTestEnvironment#triggerTaskManagerFailover checks the job status to detect a restart
terminateTaskManager(); CommonTestUtils.waitForJobStatus( jobClient, Arrays.asList(JobStatus.FAILING, JobStatus.FAILED, JobStatus.RESTARTING), Deadline.fromNow(Duration.ofMinutes(5))); afterFailAction.run(); startTaskManager();
However, `waitForJobStatus` polls every 100ms while the restart can happen within 100ms and thus can easily miss the actual restart and wait forever (or when the next restart happens because slots are missing).
We should rather use the metric `numRestarts`, check before the induced error, and wait until the counter increased.
Here is an excerpt from a log where the restart was not detected in time.
42769 [flink-akka.actor.default-dispatcher-26] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state RUNNING to RESTARTING. 42774 [flink-akka.actor.default-dispatcher-26] INFO org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - Releasing slot [ead7cad050ec7a264c0dba0b6e6a6ad9]. 42775 [flink-akka.actor.default-dispatcher-23] INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 543035cf9e19317f92ee559b70ac70bd 42776 [flink-akka.actor.default-dispatcher-22] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:RELEASING, resource profile: ResourceProfile{taskHeapMemory=170.667gb (183251937962 bytes), taskOffHeapMemory=170.667gb (183251937962 bytes), managedMemory=13.333mb (13981013 bytes), networkMemory=10.667mb (11184810 bytes)}, allocationId: ead7cad050ec7a264c0dba0b6e6a6ad9, jobId: 543035cf9e19317f92ee559b70ac70bd). 43780 [flink-akka.actor.default-dispatcher-26] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state RESTARTING to RUNNING. 43783 [flink-akka.actor.default-dispatcher-26] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 543035cf9e19317f92ee559b70ac70bd from Checkpoint 11 @ 1629093422900 for 543035cf9e19317f92ee559b70ac70bd located at <checkpoint-not-externally-addressable>. 43798 [flink-akka.actor.default-dispatcher-26] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state to restore 43800 [SourceCoordinator-Source: Tested Source -> Sink: Data stream collect sink] INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Recovering subtask 0 to checkpoint 11 for source Source: Tested Source -> Sink: Data stream collect sink to checkpoint. 43801 [flink-akka.actor.default-dispatcher-26] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Tested Source -> Sink: Data stream collect sink (1/1) (35c0ee7183308af02db4b09152f1457e) switched from CREATED to SCHEDULED.
UPDATE: A better implementation would be using RestClient to detect if tasks are failed
Attachments
Issue Links
- blocks
-
FLINK-23969 Test Pulsar source end 2 end
- Closed
- fixes
-
FLINK-24012 PulsarSourceITCase.testTaskManagerFailure fails due to NoResourceAvailableException
- Closed
- is duplicated by
-
FLINK-24174 MiniClusterTestEnvironment‘s triggerTaskManagerFailover may stuck in CommonTestUtils.waitForJobStatus()
- Closed
- is related to
-
FLINK-23944 PulsarSourceITCase.testTaskManagerFailure is instable
- Closed
- links to