Details
-
Improvement
-
Status: Closed
-
Critical
-
Resolution: Done
-
1.11.0
-
execute sql :
StreamTableEnvironment.executeSql("insert into user_log_sink select user_id, item_id, category_id, behavior, ts from user_log")
current job name : org.apache.flink.table.api.internal.TableEnvironmentImpl
public TableResult executeInternal(List<ModifyOperation> operations) { List<Transformation<?>> transformations = translate(operations); List<String> sinkIdentifierNames = extractSinkIdentifierNames(operations); String jobName = "insert-into_" + String.join(",", sinkIdentifierNames); Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName); try { JobClient jobClient = execEnv.executeAsync(pipeline); TableSchema.Builder builder = TableSchema.builder(); Object[] affectedRowCounts = new Long[operations.size()]; for (int i = 0; i < operations.size(); ++i) { // use sink identifier name as field name builder.field(sinkIdentifierNames.get(i), DataTypes.BIGINT()); affectedRowCounts[i] = -1L; } return TableResultImpl.builder() .jobClient(jobClient) .resultKind(ResultKind.SUCCESS_WITH_CONTENT) .tableSchema(builder.build()) .data(Collections.singletonList(Row.of(affectedRowCounts))) .build(); } catch (Exception e) { throw new TableException("Failed to execute sql", e); } }
execute sql : StreamTableEnvironment.executeSql("insert into user_log_sink select user_id, item_id, category_id, behavior, ts from user_log") current job name : org.apache.flink.table.api.internal.TableEnvironmentImpl public TableResult executeInternal(List<ModifyOperation> operations) { List<Transformation<?>> transformations = translate(operations); List< String > sinkIdentifierNames = extractSinkIdentifierNames(operations); String jobName = "insert-into_" + String .join( "," , sinkIdentifierNames); Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName); try { JobClient jobClient = execEnv.executeAsync(pipeline); TableSchema.Builder builder = TableSchema.builder(); Object [] affectedRowCounts = new Long [operations.size()]; for ( int i = 0; i < operations.size(); ++i) { // use sink identifier name as field name builder.field(sinkIdentifierNames.get(i), DataTypes.BIGINT()); affectedRowCounts[i] = -1L; } return TableResultImpl.builder() .jobClient(jobClient) .resultKind(ResultKind.SUCCESS_WITH_CONTENT) .tableSchema(builder.build()) .data(Collections.singletonList(Row.of(affectedRowCounts))) .build(); } catch (Exception e) { throw new TableException( "Failed to execute sql" , e); } }
-
Introduce `pipeline.name` to allow users to specify job name by configuration. This option does not break existing pipelines.
Description
In Flink 1.11.0, StreamTableEnvironment.executeSql(sql) will explan and execute job Immediately, The job name will special as "insert-into_sink-table-name". But we have Multiple sql job will insert into a same sink table, this is not very friendly.
Attachments
Issue Links
- duplicates
-
FLINK-22707 StatementSet support exectue(jobName)
- Closed
-
FLINK-20242 Support to set job name in StatementSet.execute() method
- Closed
-
FLINK-16709 add a set command to set job name when submit job on sql client
- Closed
- links to