Details
-
Improvement
-
Status: Reopened
-
Not a Priority
-
Resolution: Unresolved
-
None
-
None
-
None
Description
If the user did not specify a custom name to the source, e.g. Kafka source, Flink would use the default name "Custom Source", which was not intuitive (Sink was the same).
Source: Custom Source -> Filter -> Map -> Sink: Unnamed
If we could add the Kafka topic information to the default Source/Sink name, it would be very helpful to catch the consuming/publishing topic quickly, like this:
Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1
Suggestion (forgive me if it makes too many changes)
1. Add a `name` method to interface `Function`
public interface Function extends java.io.Serializable { default String name() { return ""; } }
2. Source/Sink/Other functions override this method depending on their needs.
class FlinkKafkaConsumerBase { String name() { return this.topicsDescriptor.toString(); } }
3. Use Function#name if the returned value is not empty.
// StreamExecutionEnvironment public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) { String sourceName = function.name(); if (StringUtils.isNullOrWhitespaceOnly(sourceName)) { sourceName = "Custom Source"; } return addSource(function, sourceName); }