Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23807

Use RestClient to detect restarts in MiniClusterTestEnvironment#triggerTaskManagerFailover

    XMLWordPrintableJSON

Details

    Description

      MiniClusterTestEnvironment#triggerTaskManagerFailover checks the job status to detect a restart

              terminateTaskManager();
              CommonTestUtils.waitForJobStatus(
                      jobClient,
                      Arrays.asList(JobStatus.FAILING, JobStatus.FAILED, JobStatus.RESTARTING),
                      Deadline.fromNow(Duration.ofMinutes(5)));
              afterFailAction.run();
              startTaskManager();
      

      However, `waitForJobStatus` polls every 100ms while the restart can happen within 100ms and thus can easily miss the actual restart and wait forever (or when the next restart happens because slots are missing).

      We should rather use the metric `numRestarts`, check before the induced error, and wait until the counter increased.

      Here is an excerpt from a log where the restart was not detected in time.

      42769 [flink-akka.actor.default-dispatcher-26] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state RUNNING to RESTARTING.
      42774 [flink-akka.actor.default-dispatcher-26] INFO  org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - Releasing slot [ead7cad050ec7a264c0dba0b6e6a6ad9].
      42775 [flink-akka.actor.default-dispatcher-23] INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 543035cf9e19317f92ee559b70ac70bd
      42776 [flink-akka.actor.default-dispatcher-22] INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:RELEASING, resource profile: ResourceProfile{taskHeapMemory=170.667gb (183251937962 bytes), taskOffHeapMemory=170.667gb (183251937962 bytes), managedMemory=13.333mb (13981013 bytes), networkMemory=10.667mb (11184810 bytes)}, allocationId: ead7cad050ec7a264c0dba0b6e6a6ad9, jobId: 543035cf9e19317f92ee559b70ac70bd).
      43780 [flink-akka.actor.default-dispatcher-26] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job TaskManager Failover Test (543035cf9e19317f92ee559b70ac70bd) switched from state RESTARTING to RUNNING.
      43783 [flink-akka.actor.default-dispatcher-26] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 543035cf9e19317f92ee559b70ac70bd from Checkpoint 11 @ 1629093422900 for 543035cf9e19317f92ee559b70ac70bd located at <checkpoint-not-externally-addressable>.
      43798 [flink-akka.actor.default-dispatcher-26] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state to restore
      43800 [SourceCoordinator-Source: Tested Source -> Sink: Data stream collect sink] INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Recovering subtask 0 to checkpoint 11 for source Source: Tested Source -> Sink: Data stream collect sink to checkpoint.
      43801 [flink-akka.actor.default-dispatcher-26] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Tested Source -> Sink: Data stream collect sink (1/1) (35c0ee7183308af02db4b09152f1457e) switched from CREATED to SCHEDULED.
      

       

      UPDATE: A better implementation would be using RestClient to detect if tasks are failed

      Attachments

        Issue Links

          Activity

            People

              renqs Qingsheng Ren
              arvid Arvid Heise
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: