Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15669

SQL client can't cancel flink job

    XMLWordPrintableJSON

    Details

      Description

      in sql client, CLI client do cancel query operation through void cancelQuery(String sessionId, String resultId) method in Executor. However, the resultId is a random UUID, is not the job id. So CLI client can't cancel a running job.

      related code in LocalExecutor:

      private <C> ResultDescriptor executeQueryInternal(String sessionId, ExecutionContext<C> context, String query) {
      	 ......
      
      	// store the result with a unique id
      	final String resultId = UUID.randomUUID().toString();
      	resultStore.storeResult(resultId, result);
      
      	......
      
      	// create execution
      	final ProgramDeployer deployer = new ProgramDeployer(
      		configuration, jobName, pipeline);
      
      	// start result retrieval
      	result.startRetrieval(deployer);
      
      	return new ResultDescriptor(
      			resultId,
      			removeTimeAttributes(table.getSchema()),
      			result.isMaterialized());
      }
      
      
      
      private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId) {
      	......
      
      	// stop Flink job
      	try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
      		ClusterClient<T> clusterClient = null;
      		try {
      			// retrieve existing cluster
      			clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
      			try {
      				// ======== cancel job through resultId =======
      				clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
      			} catch (Throwable t) {
      				// the job might has finished earlier
      			}
      		} catch (Exception e) {
      			throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
      		} finally {
      			try {
      				if (clusterClient != null) {
      					clusterClient.close();
      				}
      			} catch (Exception e) {
      				// ignore
      			}
      		}
      	} catch (SqlExecutionException e) {
      		throw e;
      	} catch (Exception e) {
      		throw new SqlExecutionException("Could not locate a cluster.", e);
      	}
      }
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                godfreyhe godfrey he
                Reporter:
                godfreyhe godfrey he
              • Votes:
                0 Vote for this issue
                Watchers:
                7 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m