Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15669

SQL client can't cancel flink job

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      in sql client, CLI client do cancel query operation through void cancelQuery(String sessionId, String resultId) method in Executor. However, the resultId is a random UUID, is not the job id. So CLI client can't cancel a running job.

      related code in LocalExecutor:

      private <C> ResultDescriptor executeQueryInternal(String sessionId, ExecutionContext<C> context, String query) {
      	 ......
      
      	// store the result with a unique id
      	final String resultId = UUID.randomUUID().toString();
      	resultStore.storeResult(resultId, result);
      
      	......
      
      	// create execution
      	final ProgramDeployer deployer = new ProgramDeployer(
      		configuration, jobName, pipeline);
      
      	// start result retrieval
      	result.startRetrieval(deployer);
      
      	return new ResultDescriptor(
      			resultId,
      			removeTimeAttributes(table.getSchema()),
      			result.isMaterialized());
      }
      
      
      
      private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId) {
      	......
      
      	// stop Flink job
      	try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
      		ClusterClient<T> clusterClient = null;
      		try {
      			// retrieve existing cluster
      			clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
      			try {
      				// ======== cancel job through resultId =======
      				clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
      			} catch (Throwable t) {
      				// the job might has finished earlier
      			}
      		} catch (Exception e) {
      			throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
      		} finally {
      			try {
      				if (clusterClient != null) {
      					clusterClient.close();
      				}
      			} catch (Exception e) {
      				// ignore
      			}
      		}
      	} catch (SqlExecutionException e) {
      		throw e;
      	} catch (Exception e) {
      		throw new SqlExecutionException("Could not locate a cluster.", e);
      	}
      }
      

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            godfreyhe godfrey he
            godfreyhe godfrey he
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 10m
                10m

                Slack

                  Issue deployment