Details
-
Improvement
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
None
-
None
Description
Currently,
1. the digest of TableSourceScan and Sink doesn't contain the connector information which will be quite useful when debugging.
2. The table name is quite verbose when under default catalog and database, would be better to simplify it to only table name if under default catalog and database.
3. Maybe it's nicer to have changelog mode of source and sink, because it's a meta information of DynamicTableSource/Sink#getChangelogMode.
Sink(table=[default_catalog.default_database.sink_kafka_count_city], fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) +- Calc(select=[city_name, CAST(count_customer) AS count_customer, CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, count_customer, sum_gender, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS sum_gender], changelogMode=[I,UA,D]) : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) : +- LocalGroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], changelogMode=[I]) : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) : +- ChangelogNormalize(key=[customer_id], changelogMode=[I,UB,UA,D]) : +- Exchange(distribution=[hash[customer_id]], changelogMode=[UA,D]) : +- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], changelogMode=[UA,D]) : +- TableSourceScan(table=[[default_catalog, default_database, source_customer]], fields=[customer_id, city_id, age, gender, update_time], changelogMode=[UA,D]) +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) +- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], changelogMode=[UA,D]) +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
Attachments
Issue Links
- relates to
-
FLINK-20346 Explain ChangelogMode for sinks
- Open