Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15669

SQL client can't cancel flink job

    XMLWordPrintableJSON

Details

    Description

      in sql client, CLI client do cancel query operation through void cancelQuery(String sessionId, String resultId) method in Executor. However, the resultId is a random UUID, is not the job id. So CLI client can't cancel a running job.

      related code in LocalExecutor:

      private <C> ResultDescriptor executeQueryInternal(String sessionId, ExecutionContext<C> context, String query) {
      	 ......
      
      	// store the result with a unique id
      	final String resultId = UUID.randomUUID().toString();
      	resultStore.storeResult(resultId, result);
      
      	......
      
      	// create execution
      	final ProgramDeployer deployer = new ProgramDeployer(
      		configuration, jobName, pipeline);
      
      	// start result retrieval
      	result.startRetrieval(deployer);
      
      	return new ResultDescriptor(
      			resultId,
      			removeTimeAttributes(table.getSchema()),
      			result.isMaterialized());
      }
      
      
      
      private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId) {
      	......
      
      	// stop Flink job
      	try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
      		ClusterClient<T> clusterClient = null;
      		try {
      			// retrieve existing cluster
      			clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
      			try {
      				// ======== cancel job through resultId =======
      				clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
      			} catch (Throwable t) {
      				// the job might has finished earlier
      			}
      		} catch (Exception e) {
      			throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
      		} finally {
      			try {
      				if (clusterClient != null) {
      					clusterClient.close();
      				}
      			} catch (Exception e) {
      				// ignore
      			}
      		}
      	} catch (SqlExecutionException e) {
      		throw e;
      	} catch (Exception e) {
      		throw new SqlExecutionException("Could not locate a cluster.", e);
      	}
      }
      

      Attachments

        Issue Links

          Activity

            People

              godfreyhe godfrey he
              godfreyhe godfrey he
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m