Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15450

Add kafka topic information to Kafka source name on Flink UI

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Reopened
    • Not a Priority
    • Resolution: Unresolved
    • None
    • None
    • None

    Description

      If the user did not specify a custom name to the source, e.g. Kafka source, Flink would use the default name "Custom Source", which was not intuitive (Sink was the same).

      Source: Custom Source -> Filter -> Map -> Sink: Unnamed
      

      If we could add the Kafka topic information to the default Source/Sink name, it would be very helpful to catch the consuming/publishing topic quickly, like this:

      Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1
      

      Suggestion (forgive me if it makes too many changes)

      1. Add a `name` method to interface `Function`

      public interface Function extends java.io.Serializable {
      	default String name() { return ""; }
      }
      

      2. Source/Sink/Other functions override this method depending on their needs.

      class FlinkKafkaConsumerBase {
      
      String name() {
        return this.topicsDescriptor.toString();
      }
      
      }
      

      3. Use Function#name if the returned value is not empty.

      // StreamExecutionEnvironment
      	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
      		String sourceName = function.name();
      		if (StringUtils.isNullOrWhitespaceOnly(sourceName)) {
      			sourceName = "Custom Source";
      		}
      		return addSource(function, sourceName);
      	}
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            victor-wong jiasheng55
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: