Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-3281

PTransform name not being propagated to the Flink Web UI

Details

    • Bug
    • Status: Resolved
    • P3
    • Resolution: Duplicate
    • 2.1.0
    • 2.5.0
    • runner-flink

    Description

      This could be related to BEAM-1107, which was logged for Flink Batch processing.

      I am experiencing a similar issue for stream processing. I would have expected the name passed to

      pipeline.apply(String name, PTransform<? super PBegin,OutputT> root)
      

      to be propagated to the Flink Web UI.

      The documentation seems to suggest that this was the intended functionality:
      https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/Pipeline.html#apply-java.lang.String-org.apache.beam.sdk.transforms.PTransform-

      Here is some sample code setting the name:

      
      p.apply("Apply Windowing Function", Window.into(FixedWindows.of(Duration.standardSeconds(10))))
                      .apply("Transform the Pipeline to Key by Window",
                              ParDo.of(
                                      new DoFn<KafkaRecord<byte[], byte[]>, KV<IntervalWindow, KafkaRecord<byte[], byte[]>>>() {
                                          @ProcessElement
                                          public void processElement(ProcessContext context, IntervalWindow window) {
                                              context.output(KV.of(window, context.element()));
                                          }
                                      }))
                      .apply("Group by Key (window)", GroupByKey.create())
                      .apply("Calculate PUE", ParDo.of(new PueCalculatorFn()))
                      .apply("Write output to Kafka", KafkaIO.<IntervalWindowResult, PueResult>write()
                              .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT)
                              .withTopic("results")
                              .withKeySerializer(IntervalWindowResultSerialiser.class)
                              .withValueSerializer(PueResultSerialiser.class)
                      );
      
      

      I will upload a screenshot of the results.

      Attachments

        1. flink-dashboard.PNG
          61 kB
          Thalita Vergilio

        Issue Links

          Activity

            People

              aljoscha Aljoscha Krettek
              javalass Thalita Vergilio
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: