Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35149

Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink

    XMLWordPrintableJSON

Details

    Description

      Current , when sink is not instanceof TwoPhaseCommittingSink, use input.transform rather than stream. It means that pre-write topology will be ignored.

      private void sinkTo(
              DataStream<Event> input,
              Sink<Event> sink,
              String sinkName,
              OperatorID schemaOperatorID) {
          DataStream<Event> stream = input;
          // Pre write topology
          if (sink instanceof WithPreWriteTopology) {
              stream = ((WithPreWriteTopology<Event>) sink).addPreWriteTopology(stream);
          }
      
          if (sink instanceof TwoPhaseCommittingSink) {
              addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
          } else {
              input.transform(
                      SINK_WRITER_PREFIX + sinkName,
                      CommittableMessageTypeInfo.noOutput(),
                      new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
          }
      } 

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              loserwang1024 Hongshun Wang
              Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated: