Details
-
Improvement
-
Status: Closed
-
Minor
-
Resolution: Duplicate
-
1.9.0
-
None
-
None
Description
In flink 1.9.0, defaultTopic param is required for FlinkKafkaProducer constructor.
In fact, if I use the below constructor, it is not necessary. Furthermore, it is confused for developer to specify different topic name.
public FlinkKafkaProducer( String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaProducer.Semantic semantic)
For example, set topic name to bar in the constructor.
input.addSink( new FlinkKafkaProducer<>( "bar", new KafkaSerializationSchemaImpl(), properties, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE));
But actually send records to topic baz from KafkaSerializationSchema.
public class KafkaSerializationSchemaImpl implements KafkaSerializationSchema<KafkaEvent> { @Override public ProducerRecord<byte[], byte[]> serialize(KafkaEvent event, @Nullable Long timestamp) { return new ProducerRecord<>("baz", event.toString().getBytes()); } }
So I suggest add a new constructor like below.
public FlinkKafkaProducer( KafkaSerializationSchema<IN> serializationSchema,
Properties producerConfig,
FlinkKafkaProducer.Semantic semantic)
It is my humble opinion, please correct me, thanks in advance.