Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5702

Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is compromised

    Details

      Description

      The documentation for FlinkKafkaProducer does not have any information about the setLogFailuresOnly. It should emphasize that if users choose to only log failures instead of failing the sink, at-least-once can not be guaranteed .

        Issue Links

          Activity

          Hide
          StephanEwen Stephan Ewen added a comment -

          Good idea.

          I would also change the default setting make the to "flush/sync on checkpoint" activated by default.

          Show
          StephanEwen Stephan Ewen added a comment - Good idea. I would also change the default setting make the to "flush/sync on checkpoint" activated by default.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Stephan Ewen Good point about changing the default. In this case we might need to think about migrating the FlinkKafkaProducerBase also though, as changing the default is a user-behaviour breaking change. I'll open a separate JIRA for this.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Stephan Ewen Good point about changing the default. In this case we might need to think about migrating the FlinkKafkaProducerBase also though, as changing the default is a user-behaviour breaking change. I'll open a separate JIRA for this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/3282

          FLINK-5702 [doc] At-least-once configuration info for FlinkKafkaProducer

          Adds information about the `setLogFailureOnly` and `setFlushOnCheckpoint` methods with regards to at-least-once guarantees of the producer, and warns that at-least-once is compromised if configured inappropriately.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-5702

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3282.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3282


          commit b90a5db99802d8d483ad064f21f8a3fbe42c4076
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-02-07T06:32:28Z

          FLINK-5702 [doc] At-least-once configuration info for FlinkKafkaProducer


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3282 FLINK-5702 [doc] At-least-once configuration info for FlinkKafkaProducer Adds information about the `setLogFailureOnly` and `setFlushOnCheckpoint` methods with regards to at-least-once guarantees of the producer, and warns that at-least-once is compromised if configured inappropriately. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-5702 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3282.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3282 commit b90a5db99802d8d483ad064f21f8a3fbe42c4076 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-02-07T06:32:28Z FLINK-5702 [doc] At-least-once configuration info for FlinkKafkaProducer
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3282#discussion_r99790794

          — Diff: docs/dev/connectors/kafka.md —
          @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp.

              1. Kafka Producer

          -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
          -records to partitions.
          +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.).
          +It allows writing a stream of records to one or more Kafka topics.

          Example:

          -
          <div class="codetabs" markdown="1">
          <div data-lang="java, Kafka 0.8+" markdown="1">

          {% highlight java %}
          -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema()); // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false); // "false" by default
          +myProducer.setFlushOnCheckpoint(true); // "false" by default
          +
          +stream.addSink(myProducer);
          {% endhighlight %}
          </div>
          <div data-lang="java, Kafka 0.10+" markdown="1">
          {% highlight java %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema(), // serialization schema
          + properties); // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false); // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default

          {% endhighlight %}
          </div>
          <div data-lang="scala, Kafka 0.8+" markdown="1">
          {% highlight scala %}
          -stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
          +val stream: DataStream[String] = ...
          +
          +val myProducer = new FlinkKafkaProducer08[String](
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema) // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false) // "false" by default
          +myProducer.setFlushOnCheckpoint(true) // "false" by default
          +
          +stream.addSink(myProducer)
          {% endhighlight %}

          </div>
          <div data-lang="scala, Kafka 0.10+" markdown="1">

          {% highlight scala %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +val stream: DataStream[String] = ...
          +
          +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema, // serialization schema
          + properties) // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false) // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default

          {% endhighlight %}

          </div>
          </div>

          -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
          -the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure
          -Kafka Producers.
          -
          -Similar to the consumer, the producer also allows using an advanced serialization schema which allows
          -serializing the key and value separately. It also allows to override the target topic id, so that
          -one producer instance can send data to multiple topics.
          -
          -The interface of the serialization schema is called `KeyedSerializationSchema`.
          -
          +The above demonstrates the basic usage of creating a Flink Kafka Producer
          — End diff –

          Maybe `The above examples demonstrate...`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3282#discussion_r99790794 — Diff: docs/dev/connectors/kafka.md — @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp. Kafka Producer -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns -records to partitions. +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.). +It allows writing a stream of records to one or more Kafka topics. Example: - <div class="codetabs" markdown="1"> <div data-lang="java, Kafka 0.8+" markdown="1"> {% highlight java %} -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema())); +DataStream<String> stream = ...; + +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema()); // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false); // "false" by default +myProducer.setFlushOnCheckpoint(true); // "false" by default + +stream.addSink(myProducer); {% endhighlight %} </div> <div data-lang="java, Kafka 0.10+" markdown="1"> {% highlight java %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +DataStream<String> stream = ...; + +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema(), // serialization schema + properties); // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false); // "false" by default +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default {% endhighlight %} </div> <div data-lang="scala, Kafka 0.8+" markdown="1"> {% highlight scala %} -stream.addSink(new FlinkKafkaProducer08 [String] ("localhost:9092", "my-topic", new SimpleStringSchema())) +val stream: DataStream [String] = ... + +val myProducer = new FlinkKafkaProducer08 [String] ( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema) // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false) // "false" by default +myProducer.setFlushOnCheckpoint(true) // "false" by default + +stream.addSink(myProducer) {% endhighlight %} </div> <div data-lang="scala, Kafka 0.10+" markdown="1"> {% highlight scala %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +val stream: DataStream [String] = ... + +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema, // serialization schema + properties) // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false) // "false" by default +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default {% endhighlight %} </div> </div> -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to -the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for details on how to configure -Kafka Producers. - -Similar to the consumer, the producer also allows using an advanced serialization schema which allows -serializing the key and value separately. It also allows to override the target topic id, so that -one producer instance can send data to multiple topics. - -The interface of the serialization schema is called `KeyedSerializationSchema`. - +The above demonstrates the basic usage of creating a Flink Kafka Producer — End diff – Maybe `The above examples demonstrate...`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3282#discussion_r99791696

          — Diff: docs/dev/connectors/kafka.md —
          @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp.

              1. Kafka Producer

          -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
          -records to partitions.
          +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.).
          +It allows writing a stream of records to one or more Kafka topics.

          Example:

          -
          <div class="codetabs" markdown="1">
          <div data-lang="java, Kafka 0.8+" markdown="1">

          {% highlight java %}
          -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema()); // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false); // "false" by default
          +myProducer.setFlushOnCheckpoint(true); // "false" by default
          +
          +stream.addSink(myProducer);
          {% endhighlight %}
          </div>
          <div data-lang="java, Kafka 0.10+" markdown="1">
          {% highlight java %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema(), // serialization schema
          + properties); // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false); // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default

          {% endhighlight %}
          </div>
          <div data-lang="scala, Kafka 0.8+" markdown="1">
          {% highlight scala %}
          -stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
          +val stream: DataStream[String] = ...
          +
          +val myProducer = new FlinkKafkaProducer08[String](
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema) // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false) // "false" by default
          +myProducer.setFlushOnCheckpoint(true) // "false" by default
          +
          +stream.addSink(myProducer)
          {% endhighlight %}

          </div>
          <div data-lang="scala, Kafka 0.10+" markdown="1">

          {% highlight scala %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +val stream: DataStream[String] = ...
          +
          +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema, // serialization schema
          + properties) // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false) // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default

          {% endhighlight %}

          </div>
          </div>

          -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
          -the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure
          -Kafka Producers.
          -
          -Similar to the consumer, the producer also allows using an advanced serialization schema which allows
          -serializing the key and value separately. It also allows to override the target topic id, so that
          -one producer instance can send data to multiple topics.
          -
          -The interface of the serialization schema is called `KeyedSerializationSchema`.
          -
          +The above demonstrates the basic usage of creating a Flink Kafka Producer
          +to write streams to a single Kafka target topic. For more advanced usages, there
          +are other constructor variants that allow providing the following:
          +
          + * Custom configuration for the internal Kafka client:
          + The producer allows providing a custom properties configuration for the internal `KafkaProducer`.
          + Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for
          + details on how to configure Kafka Producers.
          + * Custom partitioner: To assign records to specific
          + partitions, you can provide an implementation of a `KafkaProducer` to the
          + constructor. This partitioner will be called for each record in the stream
          + to determine which exact partition the record will be sent to.
          + * Advanced serialization schema: Similar to the consumer,
          + the producer also allows using a advanced serialization schema called `KeyedSerializationSchema`,
          — End diff –

          `an advanced...`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3282#discussion_r99791696 — Diff: docs/dev/connectors/kafka.md — @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp. Kafka Producer -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns -records to partitions. +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.). +It allows writing a stream of records to one or more Kafka topics. Example: - <div class="codetabs" markdown="1"> <div data-lang="java, Kafka 0.8+" markdown="1"> {% highlight java %} -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema())); +DataStream<String> stream = ...; + +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema()); // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false); // "false" by default +myProducer.setFlushOnCheckpoint(true); // "false" by default + +stream.addSink(myProducer); {% endhighlight %} </div> <div data-lang="java, Kafka 0.10+" markdown="1"> {% highlight java %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +DataStream<String> stream = ...; + +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema(), // serialization schema + properties); // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false); // "false" by default +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default {% endhighlight %} </div> <div data-lang="scala, Kafka 0.8+" markdown="1"> {% highlight scala %} -stream.addSink(new FlinkKafkaProducer08 [String] ("localhost:9092", "my-topic", new SimpleStringSchema())) +val stream: DataStream [String] = ... + +val myProducer = new FlinkKafkaProducer08 [String] ( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema) // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false) // "false" by default +myProducer.setFlushOnCheckpoint(true) // "false" by default + +stream.addSink(myProducer) {% endhighlight %} </div> <div data-lang="scala, Kafka 0.10+" markdown="1"> {% highlight scala %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +val stream: DataStream [String] = ... + +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema, // serialization schema + properties) // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false) // "false" by default +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default {% endhighlight %} </div> </div> -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to -the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for details on how to configure -Kafka Producers. - -Similar to the consumer, the producer also allows using an advanced serialization schema which allows -serializing the key and value separately. It also allows to override the target topic id, so that -one producer instance can send data to multiple topics. - -The interface of the serialization schema is called `KeyedSerializationSchema`. - +The above demonstrates the basic usage of creating a Flink Kafka Producer +to write streams to a single Kafka target topic. For more advanced usages, there +are other constructor variants that allow providing the following: + + * Custom configuration for the internal Kafka client : + The producer allows providing a custom properties configuration for the internal `KafkaProducer`. + Please refer to the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for + details on how to configure Kafka Producers. + * Custom partitioner : To assign records to specific + partitions, you can provide an implementation of a `KafkaProducer` to the + constructor. This partitioner will be called for each record in the stream + to determine which exact partition the record will be sent to. + * Advanced serialization schema : Similar to the consumer, + the producer also allows using a advanced serialization schema called `KeyedSerializationSchema`, — End diff – `an advanced...`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3282#discussion_r99792597

          — Diff: docs/dev/connectors/kafka.md —
          @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp.

              1. Kafka Producer

          -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
          -records to partitions.
          +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.).
          +It allows writing a stream of records to one or more Kafka topics.

          Example:

          -
          <div class="codetabs" markdown="1">
          <div data-lang="java, Kafka 0.8+" markdown="1">

          {% highlight java %}
          -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema()); // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false); // "false" by default
          +myProducer.setFlushOnCheckpoint(true); // "false" by default
          +
          +stream.addSink(myProducer);
          {% endhighlight %}
          </div>
          <div data-lang="java, Kafka 0.10+" markdown="1">
          {% highlight java %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema(), // serialization schema
          + properties); // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false); // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default

          {% endhighlight %}
          </div>
          <div data-lang="scala, Kafka 0.8+" markdown="1">
          {% highlight scala %}
          -stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
          +val stream: DataStream[String] = ...
          +
          +val myProducer = new FlinkKafkaProducer08[String](
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema) // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false) // "false" by default
          +myProducer.setFlushOnCheckpoint(true) // "false" by default
          +
          +stream.addSink(myProducer)
          {% endhighlight %}

          </div>
          <div data-lang="scala, Kafka 0.10+" markdown="1">

          {% highlight scala %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +val stream: DataStream[String] = ...
          +
          +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema, // serialization schema
          + properties) // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false) // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default

          {% endhighlight %}

          </div>
          </div>

          -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
          -the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure
          -Kafka Producers.
          -
          -Similar to the consumer, the producer also allows using an advanced serialization schema which allows
          -serializing the key and value separately. It also allows to override the target topic id, so that
          -one producer instance can send data to multiple topics.
          -
          -The interface of the serialization schema is called `KeyedSerializationSchema`.
          -
          +The above demonstrates the basic usage of creating a Flink Kafka Producer
          +to write streams to a single Kafka target topic. For more advanced usages, there
          +are other constructor variants that allow providing the following:
          +
          + * Custom configuration for the internal Kafka client:
          + The producer allows providing a custom properties configuration for the internal `KafkaProducer`.
          + Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for
          + details on how to configure Kafka Producers.
          + * Custom partitioner: To assign records to specific
          + partitions, you can provide an implementation of a `KafkaProducer` to the
          + constructor. This partitioner will be called for each record in the stream
          + to determine which exact partition the record will be sent to.
          + * Advanced serialization schema: Similar to the consumer,
          + the producer also allows using a advanced serialization schema called `KeyedSerializationSchema`,
          + which allows serializing the key and value separately. It also allows to override the target topic,
          + so that one producer instance can send data to multiple topics.
          +
          +The example also shows how to configure the Flink Kafka Producer for at-least-once
          +guarantees, with the setter methods `setLogFailuresOnly` and `setFlushOnCheckpoint`:
          +
          + * `setLogFailuresOnly`: enabling this will let the producer log failures only
          — End diff –

          Does it make sense to add a configuration like `setAtLeastOnce`? How often do users actually configure single parameters separately?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3282#discussion_r99792597 — Diff: docs/dev/connectors/kafka.md — @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp. Kafka Producer -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns -records to partitions. +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.). +It allows writing a stream of records to one or more Kafka topics. Example: - <div class="codetabs" markdown="1"> <div data-lang="java, Kafka 0.8+" markdown="1"> {% highlight java %} -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema())); +DataStream<String> stream = ...; + +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema()); // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false); // "false" by default +myProducer.setFlushOnCheckpoint(true); // "false" by default + +stream.addSink(myProducer); {% endhighlight %} </div> <div data-lang="java, Kafka 0.10+" markdown="1"> {% highlight java %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +DataStream<String> stream = ...; + +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema(), // serialization schema + properties); // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false); // "false" by default +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default {% endhighlight %} </div> <div data-lang="scala, Kafka 0.8+" markdown="1"> {% highlight scala %} -stream.addSink(new FlinkKafkaProducer08 [String] ("localhost:9092", "my-topic", new SimpleStringSchema())) +val stream: DataStream [String] = ... + +val myProducer = new FlinkKafkaProducer08 [String] ( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema) // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false) // "false" by default +myProducer.setFlushOnCheckpoint(true) // "false" by default + +stream.addSink(myProducer) {% endhighlight %} </div> <div data-lang="scala, Kafka 0.10+" markdown="1"> {% highlight scala %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +val stream: DataStream [String] = ... + +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema, // serialization schema + properties) // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false) // "false" by default +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default {% endhighlight %} </div> </div> -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to -the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for details on how to configure -Kafka Producers. - -Similar to the consumer, the producer also allows using an advanced serialization schema which allows -serializing the key and value separately. It also allows to override the target topic id, so that -one producer instance can send data to multiple topics. - -The interface of the serialization schema is called `KeyedSerializationSchema`. - +The above demonstrates the basic usage of creating a Flink Kafka Producer +to write streams to a single Kafka target topic. For more advanced usages, there +are other constructor variants that allow providing the following: + + * Custom configuration for the internal Kafka client : + The producer allows providing a custom properties configuration for the internal `KafkaProducer`. + Please refer to the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for + details on how to configure Kafka Producers. + * Custom partitioner : To assign records to specific + partitions, you can provide an implementation of a `KafkaProducer` to the + constructor. This partitioner will be called for each record in the stream + to determine which exact partition the record will be sent to. + * Advanced serialization schema : Similar to the consumer, + the producer also allows using a advanced serialization schema called `KeyedSerializationSchema`, + which allows serializing the key and value separately. It also allows to override the target topic, + so that one producer instance can send data to multiple topics. + +The example also shows how to configure the Flink Kafka Producer for at-least-once +guarantees, with the setter methods `setLogFailuresOnly` and `setFlushOnCheckpoint`: + + * `setLogFailuresOnly`: enabling this will let the producer log failures only — End diff – Does it make sense to add a configuration like `setAtLeastOnce`? How often do users actually configure single parameters separately?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3282#discussion_r99791528

          — Diff: docs/dev/connectors/kafka.md —
          @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp.

              1. Kafka Producer

          -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
          -records to partitions.
          +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.).
          +It allows writing a stream of records to one or more Kafka topics.

          Example:

          -
          <div class="codetabs" markdown="1">
          <div data-lang="java, Kafka 0.8+" markdown="1">

          {% highlight java %}
          -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema()); // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false); // "false" by default
          +myProducer.setFlushOnCheckpoint(true); // "false" by default
          +
          +stream.addSink(myProducer);
          {% endhighlight %}
          </div>
          <div data-lang="java, Kafka 0.10+" markdown="1">
          {% highlight java %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema(), // serialization schema
          + properties); // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false); // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default

          {% endhighlight %}
          </div>
          <div data-lang="scala, Kafka 0.8+" markdown="1">
          {% highlight scala %}
          -stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
          +val stream: DataStream[String] = ...
          +
          +val myProducer = new FlinkKafkaProducer08[String](
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema) // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false) // "false" by default
          +myProducer.setFlushOnCheckpoint(true) // "false" by default
          +
          +stream.addSink(myProducer)
          {% endhighlight %}

          </div>
          <div data-lang="scala, Kafka 0.10+" markdown="1">

          {% highlight scala %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +val stream: DataStream[String] = ...
          +
          +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema, // serialization schema
          + properties) // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false) // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default

          {% endhighlight %}

          </div>
          </div>

          -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
          -the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure
          -Kafka Producers.
          -
          -Similar to the consumer, the producer also allows using an advanced serialization schema which allows
          -serializing the key and value separately. It also allows to override the target topic id, so that
          -one producer instance can send data to multiple topics.
          -
          -The interface of the serialization schema is called `KeyedSerializationSchema`.
          -
          +The above demonstrates the basic usage of creating a Flink Kafka Producer
          +to write streams to a single Kafka target topic. For more advanced usages, there
          +are other constructor variants that allow providing the following:
          +
          + * Custom configuration for the internal Kafka client:
          + The producer allows providing a custom properties configuration for the internal `KafkaProducer`.
          — End diff –

          Maybe `providing custom Properties...`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3282#discussion_r99791528 — Diff: docs/dev/connectors/kafka.md — @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp. Kafka Producer -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns -records to partitions. +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.). +It allows writing a stream of records to one or more Kafka topics. Example: - <div class="codetabs" markdown="1"> <div data-lang="java, Kafka 0.8+" markdown="1"> {% highlight java %} -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema())); +DataStream<String> stream = ...; + +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema()); // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false); // "false" by default +myProducer.setFlushOnCheckpoint(true); // "false" by default + +stream.addSink(myProducer); {% endhighlight %} </div> <div data-lang="java, Kafka 0.10+" markdown="1"> {% highlight java %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +DataStream<String> stream = ...; + +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema(), // serialization schema + properties); // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false); // "false" by default +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default {% endhighlight %} </div> <div data-lang="scala, Kafka 0.8+" markdown="1"> {% highlight scala %} -stream.addSink(new FlinkKafkaProducer08 [String] ("localhost:9092", "my-topic", new SimpleStringSchema())) +val stream: DataStream [String] = ... + +val myProducer = new FlinkKafkaProducer08 [String] ( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema) // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false) // "false" by default +myProducer.setFlushOnCheckpoint(true) // "false" by default + +stream.addSink(myProducer) {% endhighlight %} </div> <div data-lang="scala, Kafka 0.10+" markdown="1"> {% highlight scala %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +val stream: DataStream [String] = ... + +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema, // serialization schema + properties) // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false) // "false" by default +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default {% endhighlight %} </div> </div> -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to -the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for details on how to configure -Kafka Producers. - -Similar to the consumer, the producer also allows using an advanced serialization schema which allows -serializing the key and value separately. It also allows to override the target topic id, so that -one producer instance can send data to multiple topics. - -The interface of the serialization schema is called `KeyedSerializationSchema`. - +The above demonstrates the basic usage of creating a Flink Kafka Producer +to write streams to a single Kafka target topic. For more advanced usages, there +are other constructor variants that allow providing the following: + + * Custom configuration for the internal Kafka client : + The producer allows providing a custom properties configuration for the internal `KafkaProducer`. — End diff – Maybe `providing custom Properties...`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3282#discussion_r99792130

          — Diff: docs/dev/connectors/kafka.md —
          @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp.

              1. Kafka Producer

          -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
          -records to partitions.
          +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.).
          +It allows writing a stream of records to one or more Kafka topics.

          Example:

          -
          <div class="codetabs" markdown="1">
          <div data-lang="java, Kafka 0.8+" markdown="1">

          {% highlight java %}
          -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema()); // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false); // "false" by default
          +myProducer.setFlushOnCheckpoint(true); // "false" by default
          +
          +stream.addSink(myProducer);
          {% endhighlight %}
          </div>
          <div data-lang="java, Kafka 0.10+" markdown="1">
          {% highlight java %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema(), // serialization schema
          + properties); // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false); // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default

          {% endhighlight %}
          </div>
          <div data-lang="scala, Kafka 0.8+" markdown="1">
          {% highlight scala %}
          -stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
          +val stream: DataStream[String] = ...
          +
          +val myProducer = new FlinkKafkaProducer08[String](
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema) // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false) // "false" by default
          +myProducer.setFlushOnCheckpoint(true) // "false" by default
          +
          +stream.addSink(myProducer)
          {% endhighlight %}

          </div>
          <div data-lang="scala, Kafka 0.10+" markdown="1">

          {% highlight scala %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +val stream: DataStream[String] = ...
          +
          +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema, // serialization schema
          + properties) // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false) // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default

          {% endhighlight %}

          </div>
          </div>

          -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
          -the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure
          -Kafka Producers.
          -
          -Similar to the consumer, the producer also allows using an advanced serialization schema which allows
          -serializing the key and value separately. It also allows to override the target topic id, so that
          -one producer instance can send data to multiple topics.
          -
          -The interface of the serialization schema is called `KeyedSerializationSchema`.
          -
          +The above demonstrates the basic usage of creating a Flink Kafka Producer
          +to write streams to a single Kafka target topic. For more advanced usages, there
          +are other constructor variants that allow providing the following:
          +
          + * Custom configuration for the internal Kafka client:
          + The producer allows providing a custom properties configuration for the internal `KafkaProducer`.
          + Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for
          + details on how to configure Kafka Producers.
          + * Custom partitioner: To assign records to specific
          + partitions, you can provide an implementation of a `KafkaProducer` to the
          + constructor. This partitioner will be called for each record in the stream
          + to determine which exact partition the record will be sent to.
          + * Advanced serialization schema: Similar to the consumer,
          + the producer also allows using a advanced serialization schema called `KeyedSerializationSchema`,
          + which allows serializing the key and value separately. It also allows to override the target topic,
          + so that one producer instance can send data to multiple topics.
          +
          +The example also shows how to configure the Flink Kafka Producer for at-least-once
          — End diff –

          Should we move the at least once configuration up the hierarchy and make it a sub heading?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3282#discussion_r99792130 — Diff: docs/dev/connectors/kafka.md — @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp. Kafka Producer -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns -records to partitions. +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.). +It allows writing a stream of records to one or more Kafka topics. Example: - <div class="codetabs" markdown="1"> <div data-lang="java, Kafka 0.8+" markdown="1"> {% highlight java %} -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema())); +DataStream<String> stream = ...; + +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema()); // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false); // "false" by default +myProducer.setFlushOnCheckpoint(true); // "false" by default + +stream.addSink(myProducer); {% endhighlight %} </div> <div data-lang="java, Kafka 0.10+" markdown="1"> {% highlight java %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +DataStream<String> stream = ...; + +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema(), // serialization schema + properties); // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false); // "false" by default +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default {% endhighlight %} </div> <div data-lang="scala, Kafka 0.8+" markdown="1"> {% highlight scala %} -stream.addSink(new FlinkKafkaProducer08 [String] ("localhost:9092", "my-topic", new SimpleStringSchema())) +val stream: DataStream [String] = ... + +val myProducer = new FlinkKafkaProducer08 [String] ( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema) // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false) // "false" by default +myProducer.setFlushOnCheckpoint(true) // "false" by default + +stream.addSink(myProducer) {% endhighlight %} </div> <div data-lang="scala, Kafka 0.10+" markdown="1"> {% highlight scala %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +val stream: DataStream [String] = ... + +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema, // serialization schema + properties) // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false) // "false" by default +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default {% endhighlight %} </div> </div> -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to -the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for details on how to configure -Kafka Producers. - -Similar to the consumer, the producer also allows using an advanced serialization schema which allows -serializing the key and value separately. It also allows to override the target topic id, so that -one producer instance can send data to multiple topics. - -The interface of the serialization schema is called `KeyedSerializationSchema`. - +The above demonstrates the basic usage of creating a Flink Kafka Producer +to write streams to a single Kafka target topic. For more advanced usages, there +are other constructor variants that allow providing the following: + + * Custom configuration for the internal Kafka client : + The producer allows providing a custom properties configuration for the internal `KafkaProducer`. + Please refer to the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for + details on how to configure Kafka Producers. + * Custom partitioner : To assign records to specific + partitions, you can provide an implementation of a `KafkaProducer` to the + constructor. This partitioner will be called for each record in the stream + to determine which exact partition the record will be sent to. + * Advanced serialization schema : Similar to the consumer, + the producer also allows using a advanced serialization schema called `KeyedSerializationSchema`, + which allows serializing the key and value separately. It also allows to override the target topic, + so that one producer instance can send data to multiple topics. + +The example also shows how to configure the Flink Kafka Producer for at-least-once — End diff – Should we move the at least once configuration up the hierarchy and make it a sub heading?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3282#discussion_r99791642

          — Diff: docs/dev/connectors/kafka.md —
          @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp.

              1. Kafka Producer

          -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
          -records to partitions.
          +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.).
          +It allows writing a stream of records to one or more Kafka topics.

          Example:

          -
          <div class="codetabs" markdown="1">
          <div data-lang="java, Kafka 0.8+" markdown="1">

          {% highlight java %}
          -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema()); // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false); // "false" by default
          +myProducer.setFlushOnCheckpoint(true); // "false" by default
          +
          +stream.addSink(myProducer);
          {% endhighlight %}
          </div>
          <div data-lang="java, Kafka 0.10+" markdown="1">
          {% highlight java %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema(), // serialization schema
          + properties); // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false); // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default

          {% endhighlight %}
          </div>
          <div data-lang="scala, Kafka 0.8+" markdown="1">
          {% highlight scala %}
          -stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
          +val stream: DataStream[String] = ...
          +
          +val myProducer = new FlinkKafkaProducer08[String](
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema) // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false) // "false" by default
          +myProducer.setFlushOnCheckpoint(true) // "false" by default
          +
          +stream.addSink(myProducer)
          {% endhighlight %}

          </div>
          <div data-lang="scala, Kafka 0.10+" markdown="1">

          {% highlight scala %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +val stream: DataStream[String] = ...
          +
          +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema, // serialization schema
          + properties) // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false) // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default

          {% endhighlight %}

          </div>
          </div>

          -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
          -the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure
          -Kafka Producers.
          -
          -Similar to the consumer, the producer also allows using an advanced serialization schema which allows
          -serializing the key and value separately. It also allows to override the target topic id, so that
          -one producer instance can send data to multiple topics.
          -
          -The interface of the serialization schema is called `KeyedSerializationSchema`.
          -
          +The above demonstrates the basic usage of creating a Flink Kafka Producer
          +to write streams to a single Kafka target topic. For more advanced usages, there
          +are other constructor variants that allow providing the following:
          +
          + * Custom configuration for the internal Kafka client:
          + The producer allows providing a custom properties configuration for the internal `KafkaProducer`.
          + Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for
          + details on how to configure Kafka Producers.
          + * Custom partitioner: To assign records to specific
          + partitions, you can provide an implementation of a `KafkaProducer` to the
          — End diff –

          Do you actually mean `KafkaProducer` here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3282#discussion_r99791642 — Diff: docs/dev/connectors/kafka.md — @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp. Kafka Producer -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns -records to partitions. +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.). +It allows writing a stream of records to one or more Kafka topics. Example: - <div class="codetabs" markdown="1"> <div data-lang="java, Kafka 0.8+" markdown="1"> {% highlight java %} -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema())); +DataStream<String> stream = ...; + +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema()); // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false); // "false" by default +myProducer.setFlushOnCheckpoint(true); // "false" by default + +stream.addSink(myProducer); {% endhighlight %} </div> <div data-lang="java, Kafka 0.10+" markdown="1"> {% highlight java %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +DataStream<String> stream = ...; + +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema(), // serialization schema + properties); // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false); // "false" by default +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default {% endhighlight %} </div> <div data-lang="scala, Kafka 0.8+" markdown="1"> {% highlight scala %} -stream.addSink(new FlinkKafkaProducer08 [String] ("localhost:9092", "my-topic", new SimpleStringSchema())) +val stream: DataStream [String] = ... + +val myProducer = new FlinkKafkaProducer08 [String] ( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema) // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false) // "false" by default +myProducer.setFlushOnCheckpoint(true) // "false" by default + +stream.addSink(myProducer) {% endhighlight %} </div> <div data-lang="scala, Kafka 0.10+" markdown="1"> {% highlight scala %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +val stream: DataStream [String] = ... + +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema, // serialization schema + properties) // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false) // "false" by default +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default {% endhighlight %} </div> </div> -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to -the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for details on how to configure -Kafka Producers. - -Similar to the consumer, the producer also allows using an advanced serialization schema which allows -serializing the key and value separately. It also allows to override the target topic id, so that -one producer instance can send data to multiple topics. - -The interface of the serialization schema is called `KeyedSerializationSchema`. - +The above demonstrates the basic usage of creating a Flink Kafka Producer +to write streams to a single Kafka target topic. For more advanced usages, there +are other constructor variants that allow providing the following: + + * Custom configuration for the internal Kafka client : + The producer allows providing a custom properties configuration for the internal `KafkaProducer`. + Please refer to the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for + details on how to configure Kafka Producers. + * Custom partitioner : To assign records to specific + partitions, you can provide an implementation of a `KafkaProducer` to the — End diff – Do you actually mean `KafkaProducer` here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3282#discussion_r99793771

          — Diff: docs/dev/connectors/kafka.md —
          @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp.

              1. Kafka Producer

          -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
          -records to partitions.
          +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.).
          +It allows writing a stream of records to one or more Kafka topics.

          Example:

          -
          <div class="codetabs" markdown="1">
          <div data-lang="java, Kafka 0.8+" markdown="1">

          {% highlight java %}
          -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema()); // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false); // "false" by default
          +myProducer.setFlushOnCheckpoint(true); // "false" by default
          +
          +stream.addSink(myProducer);
          {% endhighlight %}
          </div>
          <div data-lang="java, Kafka 0.10+" markdown="1">
          {% highlight java %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema(), // serialization schema
          + properties); // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false); // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default

          {% endhighlight %}
          </div>
          <div data-lang="scala, Kafka 0.8+" markdown="1">
          {% highlight scala %}
          -stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
          +val stream: DataStream[String] = ...
          +
          +val myProducer = new FlinkKafkaProducer08[String](
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema) // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false) // "false" by default
          +myProducer.setFlushOnCheckpoint(true) // "false" by default
          +
          +stream.addSink(myProducer)
          {% endhighlight %}

          </div>
          <div data-lang="scala, Kafka 0.10+" markdown="1">

          {% highlight scala %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +val stream: DataStream[String] = ...
          +
          +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema, // serialization schema
          + properties) // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false) // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default

          {% endhighlight %}

          </div>
          </div>

          -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
          -the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure
          -Kafka Producers.
          -
          -Similar to the consumer, the producer also allows using an advanced serialization schema which allows
          -serializing the key and value separately. It also allows to override the target topic id, so that
          -one producer instance can send data to multiple topics.
          -
          -The interface of the serialization schema is called `KeyedSerializationSchema`.
          -
          +The above demonstrates the basic usage of creating a Flink Kafka Producer
          +to write streams to a single Kafka target topic. For more advanced usages, there
          +are other constructor variants that allow providing the following:
          +
          + * Custom configuration for the internal Kafka client:
          + The producer allows providing a custom properties configuration for the internal `KafkaProducer`.
          + Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for
          + details on how to configure Kafka Producers.
          + * Custom partitioner: To assign records to specific
          + partitions, you can provide an implementation of a `KafkaProducer` to the
          — End diff –

          Ah, this should be `KafkaPartitioner`, will change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3282#discussion_r99793771 — Diff: docs/dev/connectors/kafka.md — @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp. Kafka Producer -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns -records to partitions. +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.). +It allows writing a stream of records to one or more Kafka topics. Example: - <div class="codetabs" markdown="1"> <div data-lang="java, Kafka 0.8+" markdown="1"> {% highlight java %} -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema())); +DataStream<String> stream = ...; + +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema()); // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false); // "false" by default +myProducer.setFlushOnCheckpoint(true); // "false" by default + +stream.addSink(myProducer); {% endhighlight %} </div> <div data-lang="java, Kafka 0.10+" markdown="1"> {% highlight java %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +DataStream<String> stream = ...; + +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema(), // serialization schema + properties); // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false); // "false" by default +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default {% endhighlight %} </div> <div data-lang="scala, Kafka 0.8+" markdown="1"> {% highlight scala %} -stream.addSink(new FlinkKafkaProducer08 [String] ("localhost:9092", "my-topic", new SimpleStringSchema())) +val stream: DataStream [String] = ... + +val myProducer = new FlinkKafkaProducer08 [String] ( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema) // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false) // "false" by default +myProducer.setFlushOnCheckpoint(true) // "false" by default + +stream.addSink(myProducer) {% endhighlight %} </div> <div data-lang="scala, Kafka 0.10+" markdown="1"> {% highlight scala %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +val stream: DataStream [String] = ... + +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema, // serialization schema + properties) // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false) // "false" by default +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default {% endhighlight %} </div> </div> -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to -the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for details on how to configure -Kafka Producers. - -Similar to the consumer, the producer also allows using an advanced serialization schema which allows -serializing the key and value separately. It also allows to override the target topic id, so that -one producer instance can send data to multiple topics. - -The interface of the serialization schema is called `KeyedSerializationSchema`. - +The above demonstrates the basic usage of creating a Flink Kafka Producer +to write streams to a single Kafka target topic. For more advanced usages, there +are other constructor variants that allow providing the following: + + * Custom configuration for the internal Kafka client : + The producer allows providing a custom properties configuration for the internal `KafkaProducer`. + Please refer to the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for + details on how to configure Kafka Producers. + * Custom partitioner : To assign records to specific + partitions, you can provide an implementation of a `KafkaProducer` to the — End diff – Ah, this should be `KafkaPartitioner`, will change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3282#discussion_r99793908

          — Diff: docs/dev/connectors/kafka.md —
          @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp.

              1. Kafka Producer

          -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
          -records to partitions.
          +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.).
          +It allows writing a stream of records to one or more Kafka topics.

          Example:

          -
          <div class="codetabs" markdown="1">
          <div data-lang="java, Kafka 0.8+" markdown="1">

          {% highlight java %}
          -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema()); // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false); // "false" by default
          +myProducer.setFlushOnCheckpoint(true); // "false" by default
          +
          +stream.addSink(myProducer);
          {% endhighlight %}
          </div>
          <div data-lang="java, Kafka 0.10+" markdown="1">
          {% highlight java %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema(), // serialization schema
          + properties); // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false); // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default

          {% endhighlight %}
          </div>
          <div data-lang="scala, Kafka 0.8+" markdown="1">
          {% highlight scala %}
          -stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
          +val stream: DataStream[String] = ...
          +
          +val myProducer = new FlinkKafkaProducer08[String](
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema) // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false) // "false" by default
          +myProducer.setFlushOnCheckpoint(true) // "false" by default
          +
          +stream.addSink(myProducer)
          {% endhighlight %}

          </div>
          <div data-lang="scala, Kafka 0.10+" markdown="1">

          {% highlight scala %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +val stream: DataStream[String] = ...
          +
          +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema, // serialization schema
          + properties) // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false) // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default

          {% endhighlight %}

          </div>
          </div>

          -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
          -the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure
          -Kafka Producers.
          -
          -Similar to the consumer, the producer also allows using an advanced serialization schema which allows
          -serializing the key and value separately. It also allows to override the target topic id, so that
          -one producer instance can send data to multiple topics.
          -
          -The interface of the serialization schema is called `KeyedSerializationSchema`.
          -
          +The above demonstrates the basic usage of creating a Flink Kafka Producer
          +to write streams to a single Kafka target topic. For more advanced usages, there
          +are other constructor variants that allow providing the following:
          +
          + * Custom configuration for the internal Kafka client:
          + The producer allows providing a custom properties configuration for the internal `KafkaProducer`.
          + Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for
          + details on how to configure Kafka Producers.
          + * Custom partitioner: To assign records to specific
          + partitions, you can provide an implementation of a `KafkaProducer` to the
          + constructor. This partitioner will be called for each record in the stream
          + to determine which exact partition the record will be sent to.
          + * Advanced serialization schema: Similar to the consumer,
          + the producer also allows using a advanced serialization schema called `KeyedSerializationSchema`,
          + which allows serializing the key and value separately. It also allows to override the target topic,
          + so that one producer instance can send data to multiple topics.
          +
          +The example also shows how to configure the Flink Kafka Producer for at-least-once
          — End diff –

          Good idea.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3282#discussion_r99793908 — Diff: docs/dev/connectors/kafka.md — @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp. Kafka Producer -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns -records to partitions. +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.). +It allows writing a stream of records to one or more Kafka topics. Example: - <div class="codetabs" markdown="1"> <div data-lang="java, Kafka 0.8+" markdown="1"> {% highlight java %} -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema())); +DataStream<String> stream = ...; + +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema()); // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false); // "false" by default +myProducer.setFlushOnCheckpoint(true); // "false" by default + +stream.addSink(myProducer); {% endhighlight %} </div> <div data-lang="java, Kafka 0.10+" markdown="1"> {% highlight java %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +DataStream<String> stream = ...; + +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema(), // serialization schema + properties); // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false); // "false" by default +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default {% endhighlight %} </div> <div data-lang="scala, Kafka 0.8+" markdown="1"> {% highlight scala %} -stream.addSink(new FlinkKafkaProducer08 [String] ("localhost:9092", "my-topic", new SimpleStringSchema())) +val stream: DataStream [String] = ... + +val myProducer = new FlinkKafkaProducer08 [String] ( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema) // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false) // "false" by default +myProducer.setFlushOnCheckpoint(true) // "false" by default + +stream.addSink(myProducer) {% endhighlight %} </div> <div data-lang="scala, Kafka 0.10+" markdown="1"> {% highlight scala %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +val stream: DataStream [String] = ... + +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema, // serialization schema + properties) // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false) // "false" by default +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default {% endhighlight %} </div> </div> -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to -the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for details on how to configure -Kafka Producers. - -Similar to the consumer, the producer also allows using an advanced serialization schema which allows -serializing the key and value separately. It also allows to override the target topic id, so that -one producer instance can send data to multiple topics. - -The interface of the serialization schema is called `KeyedSerializationSchema`. - +The above demonstrates the basic usage of creating a Flink Kafka Producer +to write streams to a single Kafka target topic. For more advanced usages, there +are other constructor variants that allow providing the following: + + * Custom configuration for the internal Kafka client : + The producer allows providing a custom properties configuration for the internal `KafkaProducer`. + Please refer to the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for + details on how to configure Kafka Producers. + * Custom partitioner : To assign records to specific + partitions, you can provide an implementation of a `KafkaProducer` to the + constructor. This partitioner will be called for each record in the stream + to determine which exact partition the record will be sent to. + * Advanced serialization schema : Similar to the consumer, + the producer also allows using a advanced serialization schema called `KeyedSerializationSchema`, + which allows serializing the key and value separately. It also allows to override the target topic, + so that one producer instance can send data to multiple topics. + +The example also shows how to configure the Flink Kafka Producer for at-least-once — End diff – Good idea.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3282#discussion_r99796056

          — Diff: docs/dev/connectors/kafka.md —
          @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp.

              1. Kafka Producer

          -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
          -records to partitions.
          +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.).
          +It allows writing a stream of records to one or more Kafka topics.

          Example:

          -
          <div class="codetabs" markdown="1">
          <div data-lang="java, Kafka 0.8+" markdown="1">

          {% highlight java %}
          -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema()); // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false); // "false" by default
          +myProducer.setFlushOnCheckpoint(true); // "false" by default
          +
          +stream.addSink(myProducer);
          {% endhighlight %}
          </div>
          <div data-lang="java, Kafka 0.10+" markdown="1">
          {% highlight java %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +DataStream<String> stream = ...;
          +
          +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema(), // serialization schema
          + properties); // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false); // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default

          {% endhighlight %}
          </div>
          <div data-lang="scala, Kafka 0.8+" markdown="1">
          {% highlight scala %}
          -stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
          +val stream: DataStream[String] = ...
          +
          +val myProducer = new FlinkKafkaProducer08[String](
          + "localhost:9092", // broker list
          + "my-topic", // target topic
          + new SimpleStringSchema) // serialization schema
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducer.setLogFailuresOnly(false) // "false" by default
          +myProducer.setFlushOnCheckpoint(true) // "false" by default
          +
          +stream.addSink(myProducer)
          {% endhighlight %}

          </div>
          <div data-lang="scala, Kafka 0.10+" markdown="1">

          {% highlight scala %}

          -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
          +val stream: DataStream[String] = ...
          +
          +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
          + stream, // input stream
          + "my-topic", // target topic
          + new SimpleStringSchema, // serialization schema
          + properties) // custom configuration for KafkaProducer (including broker list)
          +
          +// the following is necessary for at-least-once delivery guarantee
          +myProducerConfig.setLogFailuresOnly(false) // "false" by default
          +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default

          {% endhighlight %}

          </div>
          </div>

          -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
          -the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure
          -Kafka Producers.
          -
          -Similar to the consumer, the producer also allows using an advanced serialization schema which allows
          -serializing the key and value separately. It also allows to override the target topic id, so that
          -one producer instance can send data to multiple topics.
          -
          -The interface of the serialization schema is called `KeyedSerializationSchema`.
          -
          +The above demonstrates the basic usage of creating a Flink Kafka Producer
          +to write streams to a single Kafka target topic. For more advanced usages, there
          +are other constructor variants that allow providing the following:
          +
          + * Custom configuration for the internal Kafka client:
          + The producer allows providing a custom properties configuration for the internal `KafkaProducer`.
          + Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for
          + details on how to configure Kafka Producers.
          + * Custom partitioner: To assign records to specific
          + partitions, you can provide an implementation of a `KafkaProducer` to the
          + constructor. This partitioner will be called for each record in the stream
          + to determine which exact partition the record will be sent to.
          + * Advanced serialization schema: Similar to the consumer,
          + the producer also allows using a advanced serialization schema called `KeyedSerializationSchema`,
          + which allows serializing the key and value separately. It also allows to override the target topic,
          + so that one producer instance can send data to multiple topics.
          +
          +The example also shows how to configure the Flink Kafka Producer for at-least-once
          +guarantees, with the setter methods `setLogFailuresOnly` and `setFlushOnCheckpoint`:
          +
          + * `setLogFailuresOnly`: enabling this will let the producer log failures only
          — End diff –

          I was also thinking about this actually. Perhaps we should even just enforce these two settings when checkpointing is enabled, otherwise having a `setAtLeastOnce` setting but enabling checkpointing is yet another configuration seems a bit strange to me (ex. the user might not have enabled checkpointing but have called `setAtLeastOnce(true)`).

          Either way I prefer to discuss this in https://issues.apache.org/jira/browse/FLINK-5728, and keep this as is until we come to a conclusion with how to deal with these 2 settings. I'll incorporate your comment on this in FLINK-5728.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3282#discussion_r99796056 — Diff: docs/dev/connectors/kafka.md — @@ -250,57 +250,116 @@ if a new watermark should be emitted and with which timestamp. Kafka Producer -The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns -records to partitions. +Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.). +It allows writing a stream of records to one or more Kafka topics. Example: - <div class="codetabs" markdown="1"> <div data-lang="java, Kafka 0.8+" markdown="1"> {% highlight java %} -stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema())); +DataStream<String> stream = ...; + +FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema()); // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false); // "false" by default +myProducer.setFlushOnCheckpoint(true); // "false" by default + +stream.addSink(myProducer); {% endhighlight %} </div> <div data-lang="java, Kafka 0.10+" markdown="1"> {% highlight java %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +DataStream<String> stream = ...; + +FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema(), // serialization schema + properties); // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false); // "false" by default +myProducerConfig.setFlushOnCheckpoint(true); // "false" by default {% endhighlight %} </div> <div data-lang="scala, Kafka 0.8+" markdown="1"> {% highlight scala %} -stream.addSink(new FlinkKafkaProducer08 [String] ("localhost:9092", "my-topic", new SimpleStringSchema())) +val stream: DataStream [String] = ... + +val myProducer = new FlinkKafkaProducer08 [String] ( + "localhost:9092", // broker list + "my-topic", // target topic + new SimpleStringSchema) // serialization schema + +// the following is necessary for at-least-once delivery guarantee +myProducer.setLogFailuresOnly(false) // "false" by default +myProducer.setFlushOnCheckpoint(true) // "false" by default + +stream.addSink(myProducer) {% endhighlight %} </div> <div data-lang="scala, Kafka 0.10+" markdown="1"> {% highlight scala %} -FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +val stream: DataStream [String] = ... + +val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( + stream, // input stream + "my-topic", // target topic + new SimpleStringSchema, // serialization schema + properties) // custom configuration for KafkaProducer (including broker list) + +// the following is necessary for at-least-once delivery guarantee +myProducerConfig.setLogFailuresOnly(false) // "false" by default +myProducerConfig.setFlushOnCheckpoint(true) // "false" by default {% endhighlight %} </div> </div> -You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to -the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for details on how to configure -Kafka Producers. - -Similar to the consumer, the producer also allows using an advanced serialization schema which allows -serializing the key and value separately. It also allows to override the target topic id, so that -one producer instance can send data to multiple topics. - -The interface of the serialization schema is called `KeyedSerializationSchema`. - +The above demonstrates the basic usage of creating a Flink Kafka Producer +to write streams to a single Kafka target topic. For more advanced usages, there +are other constructor variants that allow providing the following: + + * Custom configuration for the internal Kafka client : + The producer allows providing a custom properties configuration for the internal `KafkaProducer`. + Please refer to the [Apache Kafka documentation] ( https://kafka.apache.org/documentation.html ) for + details on how to configure Kafka Producers. + * Custom partitioner : To assign records to specific + partitions, you can provide an implementation of a `KafkaProducer` to the + constructor. This partitioner will be called for each record in the stream + to determine which exact partition the record will be sent to. + * Advanced serialization schema : Similar to the consumer, + the producer also allows using a advanced serialization schema called `KeyedSerializationSchema`, + which allows serializing the key and value separately. It also allows to override the target topic, + so that one producer instance can send data to multiple topics. + +The example also shows how to configure the Flink Kafka Producer for at-least-once +guarantees, with the setter methods `setLogFailuresOnly` and `setFlushOnCheckpoint`: + + * `setLogFailuresOnly`: enabling this will let the producer log failures only — End diff – I was also thinking about this actually. Perhaps we should even just enforce these two settings when checkpointing is enabled, otherwise having a `setAtLeastOnce` setting but enabling checkpointing is yet another configuration seems a bit strange to me (ex. the user might not have enabled checkpointing but have called `setAtLeastOnce(true)`). Either way I prefer to discuss this in https://issues.apache.org/jira/browse/FLINK-5728 , and keep this as is until we come to a conclusion with how to deal with these 2 settings. I'll incorporate your comment on this in FLINK-5728 .
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3282

          Thank you for the review @uce! I'll address all of them except the last one (please see my inline reply).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3282 Thank you for the review @uce! I'll address all of them except the last one (please see my inline reply).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3282

          The comments have been addressed. Apart from the minor fixes for grammar, all sections were moved up the hierarchy by one to show a better ToC. The at-least-once info for the producer is moved to a sub-section called "Kafka Producers and Fault Tolerance" (coherent with the naming for exactly-once info on the consumer side).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3282 The comments have been addressed. Apart from the minor fixes for grammar, all sections were moved up the hierarchy by one to show a better ToC. The at-least-once info for the producer is moved to a sub-section called "Kafka Producers and Fault Tolerance" (coherent with the naming for exactly-once info on the consumer side).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/3282

          Makes sense, +1 to merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/3282 Makes sense, +1 to merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3282

          Merging to `master` and `release-1.2` ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3282 Merging to `master` and `release-1.2` ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3282

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3282
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for master via http://git-wip-us.apache.org/repos/asf/flink/commit/cba85db Resolved for release-1.2 via http://git-wip-us.apache.org/repos/asf/flink/commit/bcace06

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development