Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
1.15.0
-
None
Description
In the code,flink web ui graph data from under method.
AdaptiveScheduler.requestJob()
@Override public ExecutionGraphInfo requestJob() { return new ExecutionGraphInfo(state.getJob(), exceptionHistory.toArrayList()); }
This executionGraphInfo is task restart build and restore to state.
You can see the code, the parallelism recalculate and copy jobGraph to reset.
AdaptiveScheduler.createExecutionGraphWithAvailableResourcesAsync().
vertexParallelism = determineParallelism(slotAllocator); JobGraph adjustedJobGraph = jobInformation.copyJobGraph(); for (JobVertex vertex : adjustedJobGraph.getVertices()) { JobVertexID id = vertex.getID(); // use the determined "available parallelism" to use // the resources we have access to vertex.setParallelism(vertexParallelism.getParallelism(id)); }
But in the restoreState copy jobGraph again, so the jobGraph parallelism always deployed for the first time.
AdaptiveScheduler.createExecutionGraphAndRestoreState(VertexParallelismStore adjustedParallelismStore)
private ExecutionGraph createExecutionGraphAndRestoreState( VertexParallelismStore adjustedParallelismStore) throws Exception { return executionGraphFactory.createAndRestoreExecutionGraph( jobInformation.copyJobGraph(), completedCheckpointStore, checkpointsCleaner, checkpointIdCounter, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN, initializationTimestamp, vertexAttemptNumberStore, adjustedParallelismStore, deploymentTimeMetrics, LOG); }
Attachments
Issue Links
- links to