Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
cdc-3.1.0
Description
Current , when sink is not instanceof TwoPhaseCommittingSink, use input.transform rather than stream. It means that pre-write topology will be ignored.
private void sinkTo( DataStream<Event> input, Sink<Event> sink, String sinkName, OperatorID schemaOperatorID) { DataStream<Event> stream = input; // Pre write topology if (sink instanceof WithPreWriteTopology) { stream = ((WithPreWriteTopology<Event>) sink).addPreWriteTopology(stream); } if (sink instanceof TwoPhaseCommittingSink) { addCommittingTopology(sink, stream, sinkName, schemaOperatorID); } else { input.transform( SINK_WRITER_PREFIX + sinkName, CommittableMessageTypeInfo.noOutput(), new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID)); } }
Attachments
Issue Links
- links to