Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14528

Add a constructor for FlinkKafkaProducer

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Minor
    • Resolution: Duplicate
    • 1.9.0
    • None
    • Connectors / Kafka
    • None

    Description

      In flink 1.9.0, defaultTopic param is required for FlinkKafkaProducer constructor.

      In fact, if I use the below constructor, it is not necessary. Furthermore, it is confused for developer to specify different topic name.

      public FlinkKafkaProducer( String defaultTopic, 
                                 KafkaSerializationSchema<IN> serializationSchema, 
                                 Properties producerConfig, 
                                 FlinkKafkaProducer.Semantic semantic)
      

      For example, set topic name to bar in the constructor.

      input.addSink( new FlinkKafkaProducer<>( 
                                           "bar", 
                                           new KafkaSerializationSchemaImpl(), 
                                           properties, 
                                           FlinkKafkaProducer.Semantic.AT_LEAST_ONCE));
      

      But actually send records to topic baz from KafkaSerializationSchema. 

      public class KafkaSerializationSchemaImpl implements KafkaSerializationSchema<KafkaEvent> 
      {    
          @Override    
          public ProducerRecord<byte[], byte[]> serialize(KafkaEvent event, @Nullable Long timestamp) {
              return new ProducerRecord<>("baz", event.toString().getBytes());
          }
      }
      

      So I suggest add a new constructor like below.

      public FlinkKafkaProducer( KafkaSerializationSchema<IN> serializationSchema,
                                 Properties producerConfig, 
                                 FlinkKafkaProducer.Semantic semantic)
      

      It is my humble opinion, please correct me, thanks in advance.

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            wooplevip Peng
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: