Details
-
Sub-task
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.19.0
Description
Test suggestion:
- Prepare a datastream job that all tasks throw exception directly.
- Set the parallelism to 5 or above
- Prepare some configuration options:
- restart-strategy.type : exponential-delay
- restart-strategy.exponential-delay.attempts-before-reset-backoff : 7
- Start the cluster: ./bin/start-cluster.sh
- Run the job: ./bin/flink run -c className jarName
- Check the result
- Check whether job will be retried 7 times
- Check the exception history, the list has 7 exceptions
- Each retries except the last one can see the 5 subtasks(They are concurrent exceptions).
Note: Set these options mentioned at step2 at 2 level separately
- Cluster level: set them in the config.yaml
- Job level: Set them in the code
Job level demo:
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setString("restart-strategy", "exponential-delay"); conf.setString("restart-strategy.exponential-delay.attempts-before-reset-backoff", "6"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(5); DataGeneratorSource<Long> generatorSource = new DataGeneratorSource<>( value -> value, 300, RateLimiterStrategy.perSecond(10), Types.LONG); env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data Generator") .map(new RichMapFunction<Long, Long>() { @Override public Long map(Long value) { throw new RuntimeException( "Excepted testing exception, subtaskIndex: " + getRuntimeContext().getIndexOfThisSubtask()); } }) .print(); env.execute(); }
Attachments
Attachments
Issue Links
- is a clone of
-
FLINK-34288 Release Testing Instructions: Verify FLINK-33735 Improve the exponential-delay restart-strategy
- Closed